You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Timothy Farkas <ti...@datatorrent.com> on 2015/12/17 18:47:14 UTC

Database Output Operator Improvements

Hi All,

One of our users is outputting to Cassandra, but they want to handle a
Cassandra failure or Cassandra down time gracefully from an output
operator. Currently a lot of our database operators will just fail and
redeploy continually until the database comes back. This is a bad idea for
a couple of reasons:

1 - We rely on buffer server spooling to prevent data loss. If the database
is down for a long time (several hours or a day) we may run out of space to
spool for buffer server since it spools to local disk, and data is purged
only after a window is committed. Furthermore this buffer server problem
will exist for all the Streaming Containers in the dag, not just the one
immediately upstream from the output operator, since data is spooled to
disk for all operators and only removed for windows once a window is
committed.

2 - If there is another failure further upstream in the dag, upstream
operators will be redeployed to a checkpoint less than or equal to the
checkpoint of the database operator in the At leas once case. This could
mean redoing several hours or a day worth of computation.

We should support a mechanism to detect when the connection to a database
is lost and then spool to hdfs using a WAL, and then write the contents of
the WAL into the database once it comes back online. This will save the
local disk space of all the nodes used in the dag and allow it to be used
for only the data being output to the output operator.

Ticket here if anyone is interested in working on it:

https://malhar.atlassian.net/browse/MLHR-1951

Thanks,
Tim

Re: Database Output Operator Improvements

Posted by Gaurav Gupta <ga...@datatorrent.com>.
Tim,

I think this can also be solved as follows

1. Have a HDFS output operator before the Cassandra Operator. This hdfs operator writes the tuples to hdfs based on file size/ rolling window as user want. It sends the file location to downstream Cassandra Operator
2. The Cassandra operator takes these files and upload these files once the window Id in which the file was seen is committed in a separate thread something like what reconciler does. 
 
Does that help?

Thanks
- Gaurav

> On Dec 17, 2015, at 9:47 AM, Timothy Farkas <ti...@datatorrent.com> wrote:
> 
> Hi All,
> 
> One of our users is outputting to Cassandra, but they want to handle a
> Cassandra failure or Cassandra down time gracefully from an output
> operator. Currently a lot of our database operators will just fail and
> redeploy continually until the database comes back. This is a bad idea for
> a couple of reasons:
> 
> 1 - We rely on buffer server spooling to prevent data loss. If the database
> is down for a long time (several hours or a day) we may run out of space to
> spool for buffer server since it spools to local disk, and data is purged
> only after a window is committed. Furthermore this buffer server problem
> will exist for all the Streaming Containers in the dag, not just the one
> immediately upstream from the output operator, since data is spooled to
> disk for all operators and only removed for windows once a window is
> committed.
> 
> 2 - If there is another failure further upstream in the dag, upstream
> operators will be redeployed to a checkpoint less than or equal to the
> checkpoint of the database operator in the At leas once case. This could
> mean redoing several hours or a day worth of computation.
> 
> We should support a mechanism to detect when the connection to a database
> is lost and then spool to hdfs using a WAL, and then write the contents of
> the WAL into the database once it comes back online. This will save the
> local disk space of all the nodes used in the dag and allow it to be used
> for only the data being output to the output operator.
> 
> Ticket here if anyone is interested in working on it:
> 
> https://malhar.atlassian.net/browse/MLHR-1951
> 
> Thanks,
> Tim


Re: Database Output Operator Improvements

Posted by Timothy Farkas <ti...@datatorrent.com>.
Thanks Priyanka, and Ashwin!

On Fri, Dec 18, 2015 at 2:19 AM, Priyanka Gugale <pr...@datatorrent.com>
wrote:

> Agree with Tim and Chandni that we should go to disk only when output DB is
> not reachable or slow.
> As suggested the best approach will be to use combination of
> AbstractReconsiler and WAL (spill to disk only when in memory queue size is
> reached).
>
> I can take it up to integrate enhanced reconsiler with DB output operator.
> Also can help in using WAL with AbstractReconsiler.
>
> -Priyanka
>
> On Fri, Dec 18, 2015 at 4:06 AM, Ashwin Chandra Putta <
> ashwinchandrap@gmail.com> wrote:
>
> > I will send a PR for my first implementation soon.
> >
> > On Thu, Dec 17, 2015 at 2:34 PM, Timothy Farkas <ti...@datatorrent.com>
> > wrote:
> >
> > > It looks like Ashwin has an initial implementation of a reconciler.
> Could
> > > we add that to Malhar and add WAL optimizations to it once the WAL is
> > added
> > > to Malhar?
> > >
> > > On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh <
> chandni@datatorrent.com>
> > > wrote:
> > >
> > > > Pramod,
> > > >
> > > > Agreed it can be done using the reconciler and optimizing it but that
> > > means
> > > > there is some work to be done in Malhar/library. We have a ticket now
> > to
> > > > address that work.
> > > >
> > > > Using WAL to spool the tuples is all missing from Malhar/lib which
> > means
> > > > the user needs to write more code.
> > > >
> > > > Thanks,
> > > > Chandni
> > > >
> > > > On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta <
> > > > ashwinchandrap@gmail.com> wrote:
> > > >
> > > > > Tim,
> > > > >
> > > > > I don't think there is an implementation in Malhar yet. I have an
> > > > > implementation in my fork that I sent you.
> > > > >
> > > > > Regards,
> > > > > Ashwin.
> > > > >
> > > > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <
> > tim@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Ashwin is there an implementation of that in Malhar? I could only
> > > find
> > > > an
> > > > > > in memory only version:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
> > > > > >
> > > > > > This in memory implementation won't work in this use case since
> > > > committed
> > > > > > may not be called for hours or a day so data will be held in
> memory
> > > for
> > > > > > some time.
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> > > > > > ashwinchandrap@gmail.com> wrote:
> > > > > >
> > > > > > > Tim,
> > > > > > >
> > > > > > > Are you saying HDFS is slower than a database? :)
> > > > > > >
> > > > > > > I think Reconciler is the best approach. The tuples need not be
> > > > written
> > > > > > to
> > > > > > > hdfs, they can be queued in memory. You can spool them to hdfs
> > only
> > > > > when
> > > > > > it
> > > > > > > reaches the limits of the queue. The reconciler solves a few
> > major
> > > > > > problems
> > > > > > > as you described above.
> > > > > > >
> > > > > > > 1. Graceful reconnection. When the external system we are
> writing
> > > to
> > > > is
> > > > > > > down, the reconciler is spooling the messages to the queue and
> > then
> > > > to
> > > > > > > hdfs. The tuples are written to the external system only after
> it
> > > is
> > > > > back
> > > > > > > up again.
> > > > > > > 2. Handling surges. There will be cases when the throughput may
> > > get a
> > > > > > > sudden surge for some period and the external system may not be
> > > fast
> > > > > > enough
> > > > > > > for the writes to it. In those cases, by using reconciler, we
> are
> > > > > > spooling
> > > > > > > the incoming tuples to queue/hdfs and then writing at the pace
> of
> > > > > > external
> > > > > > > system.
> > > > > > > 3. Dag slowdown. Again in case of external system failure or
> slow
> > > > > > > connection, we do not want to block the windows moving forward.
> > If
> > > > the
> > > > > > > windows are blocked for a long time, then stram will
> > unnecessarily
> > > > kill
> > > > > > the
> > > > > > > operator. Reconciler makes sure that the incoming messages are
> > just
> > > > > > > queued/spooled to hdfs (external system is not blocking the
> dag),
> > > so
> > > > > the
> > > > > > > dag is not slowed down.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Ashwin.
> > > > > > >
> > > > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <
> > > > tim@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Yes that is true Chandni, and considering how slow HDFS is we
> > > > should
> > > > > > > avoid
> > > > > > > > writing to it if we can.
> > > > > > > >
> > > > > > > > It would be great if someone could pick up the ticket :).
> > > > > > > >
> > > > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> > > > > > chandni@datatorrent.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 for Tim's suggestion.
> > > > > > > > >
> > > > > > > > > Using reconciler employs always writing to HDFS and then
> read
> > > > from
> > > > > > > that.
> > > > > > > > > Tim's suggestion is that we only write to hdfs when
> database
> > > > > > connection
> > > > > > > > is
> > > > > > > > > down. This is analogous to spooling.
> > > > > > > > >
> > > > > > > > > Chandni
> > > > > > > > >
> > > > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > > > > > > pramod@datatorrent.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Tim we have a pattern for this called Reconciler that
> > Gaurav
> > > > has
> > > > > > also
> > > > > > > > > > mentioned. There are some examples for it in Malhar
> > > > > > > > > >
> > > > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> > > > > > tim@datatorrent.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All,
> > > > > > > > > > >
> > > > > > > > > > > One of our users is outputting to Cassandra, but they
> > want
> > > to
> > > > > > > handle
> > > > > > > > a
> > > > > > > > > > > Cassandra failure or Cassandra down time gracefully
> from
> > an
> > > > > > output
> > > > > > > > > > > operator. Currently a lot of our database operators
> will
> > > just
> > > > > > fail
> > > > > > > > and
> > > > > > > > > > > redeploy continually until the database comes back.
> This
> > > is a
> > > > > bad
> > > > > > > > idea
> > > > > > > > > > for
> > > > > > > > > > > a couple of reasons:
> > > > > > > > > > >
> > > > > > > > > > > 1 - We rely on buffer server spooling to prevent data
> > loss.
> > > > If
> > > > > > the
> > > > > > > > > > database
> > > > > > > > > > > is down for a long time (several hours or a day) we may
> > run
> > > > out
> > > > > > of
> > > > > > > > > space
> > > > > > > > > > to
> > > > > > > > > > > spool for buffer server since it spools to local disk,
> > and
> > > > data
> > > > > > is
> > > > > > > > > purged
> > > > > > > > > > > only after a window is committed. Furthermore this
> buffer
> > > > > server
> > > > > > > > > problem
> > > > > > > > > > > will exist for all the Streaming Containers in the dag,
> > not
> > > > > just
> > > > > > > the
> > > > > > > > > one
> > > > > > > > > > > immediately upstream from the output operator, since
> data
> > > is
> > > > > > > spooled
> > > > > > > > to
> > > > > > > > > > > disk for all operators and only removed for windows
> once
> > a
> > > > > window
> > > > > > > is
> > > > > > > > > > > committed.
> > > > > > > > > > >
> > > > > > > > > > > 2 - If there is another failure further upstream in the
> > > dag,
> > > > > > > upstream
> > > > > > > > > > > operators will be redeployed to a checkpoint less than
> or
> > > > equal
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > checkpoint of the database operator in the At leas once
> > > case.
> > > > > > This
> > > > > > > > > could
> > > > > > > > > > > mean redoing several hours or a day worth of
> computation.
> > > > > > > > > > >
> > > > > > > > > > > We should support a mechanism to detect when the
> > connection
> > > > to
> > > > > a
> > > > > > > > > database
> > > > > > > > > > > is lost and then spool to hdfs using a WAL, and then
> > write
> > > > the
> > > > > > > > contents
> > > > > > > > > > of
> > > > > > > > > > > the WAL into the database once it comes back online.
> This
> > > > will
> > > > > > save
> > > > > > > > the
> > > > > > > > > > > local disk space of all the nodes used in the dag and
> > allow
> > > > it
> > > > > to
> > > > > > > be
> > > > > > > > > used
> > > > > > > > > > > for only the data being output to the output operator.
> > > > > > > > > > >
> > > > > > > > > > > Ticket here if anyone is interested in working on it:
> > > > > > > > > > >
> > > > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Tim
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Regards,
> > > > > > > Ashwin.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Regards,
> > > > > Ashwin.
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Ashwin.
> >
>

Re: Database Output Operator Improvements

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Agree with Tim and Chandni that we should go to disk only when output DB is
not reachable or slow.
As suggested the best approach will be to use combination of
AbstractReconsiler and WAL (spill to disk only when in memory queue size is
reached).

I can take it up to integrate enhanced reconsiler with DB output operator.
Also can help in using WAL with AbstractReconsiler.

-Priyanka

On Fri, Dec 18, 2015 at 4:06 AM, Ashwin Chandra Putta <
ashwinchandrap@gmail.com> wrote:

> I will send a PR for my first implementation soon.
>
> On Thu, Dec 17, 2015 at 2:34 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > It looks like Ashwin has an initial implementation of a reconciler. Could
> > we add that to Malhar and add WAL optimizations to it once the WAL is
> added
> > to Malhar?
> >
> > On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> > > Pramod,
> > >
> > > Agreed it can be done using the reconciler and optimizing it but that
> > means
> > > there is some work to be done in Malhar/library. We have a ticket now
> to
> > > address that work.
> > >
> > > Using WAL to spool the tuples is all missing from Malhar/lib which
> means
> > > the user needs to write more code.
> > >
> > > Thanks,
> > > Chandni
> > >
> > > On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta <
> > > ashwinchandrap@gmail.com> wrote:
> > >
> > > > Tim,
> > > >
> > > > I don't think there is an implementation in Malhar yet. I have an
> > > > implementation in my fork that I sent you.
> > > >
> > > > Regards,
> > > > Ashwin.
> > > >
> > > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <
> tim@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Ashwin is there an implementation of that in Malhar? I could only
> > find
> > > an
> > > > > in memory only version:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
> > > > >
> > > > > This in memory implementation won't work in this use case since
> > > committed
> > > > > may not be called for hours or a day so data will be held in memory
> > for
> > > > > some time.
> > > > >
> > > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> > > > > ashwinchandrap@gmail.com> wrote:
> > > > >
> > > > > > Tim,
> > > > > >
> > > > > > Are you saying HDFS is slower than a database? :)
> > > > > >
> > > > > > I think Reconciler is the best approach. The tuples need not be
> > > written
> > > > > to
> > > > > > hdfs, they can be queued in memory. You can spool them to hdfs
> only
> > > > when
> > > > > it
> > > > > > reaches the limits of the queue. The reconciler solves a few
> major
> > > > > problems
> > > > > > as you described above.
> > > > > >
> > > > > > 1. Graceful reconnection. When the external system we are writing
> > to
> > > is
> > > > > > down, the reconciler is spooling the messages to the queue and
> then
> > > to
> > > > > > hdfs. The tuples are written to the external system only after it
> > is
> > > > back
> > > > > > up again.
> > > > > > 2. Handling surges. There will be cases when the throughput may
> > get a
> > > > > > sudden surge for some period and the external system may not be
> > fast
> > > > > enough
> > > > > > for the writes to it. In those cases, by using reconciler, we are
> > > > > spooling
> > > > > > the incoming tuples to queue/hdfs and then writing at the pace of
> > > > > external
> > > > > > system.
> > > > > > 3. Dag slowdown. Again in case of external system failure or slow
> > > > > > connection, we do not want to block the windows moving forward.
> If
> > > the
> > > > > > windows are blocked for a long time, then stram will
> unnecessarily
> > > kill
> > > > > the
> > > > > > operator. Reconciler makes sure that the incoming messages are
> just
> > > > > > queued/spooled to hdfs (external system is not blocking the dag),
> > so
> > > > the
> > > > > > dag is not slowed down.
> > > > > >
> > > > > > Regards,
> > > > > > Ashwin.
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <
> > > tim@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Yes that is true Chandni, and considering how slow HDFS is we
> > > should
> > > > > > avoid
> > > > > > > writing to it if we can.
> > > > > > >
> > > > > > > It would be great if someone could pick up the ticket :).
> > > > > > >
> > > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> > > > > chandni@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 for Tim's suggestion.
> > > > > > > >
> > > > > > > > Using reconciler employs always writing to HDFS and then read
> > > from
> > > > > > that.
> > > > > > > > Tim's suggestion is that we only write to hdfs when database
> > > > > connection
> > > > > > > is
> > > > > > > > down. This is analogous to spooling.
> > > > > > > >
> > > > > > > > Chandni
> > > > > > > >
> > > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > > > > > pramod@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Tim we have a pattern for this called Reconciler that
> Gaurav
> > > has
> > > > > also
> > > > > > > > > mentioned. There are some examples for it in Malhar
> > > > > > > > >
> > > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> > > > > tim@datatorrent.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi All,
> > > > > > > > > >
> > > > > > > > > > One of our users is outputting to Cassandra, but they
> want
> > to
> > > > > > handle
> > > > > > > a
> > > > > > > > > > Cassandra failure or Cassandra down time gracefully from
> an
> > > > > output
> > > > > > > > > > operator. Currently a lot of our database operators will
> > just
> > > > > fail
> > > > > > > and
> > > > > > > > > > redeploy continually until the database comes back. This
> > is a
> > > > bad
> > > > > > > idea
> > > > > > > > > for
> > > > > > > > > > a couple of reasons:
> > > > > > > > > >
> > > > > > > > > > 1 - We rely on buffer server spooling to prevent data
> loss.
> > > If
> > > > > the
> > > > > > > > > database
> > > > > > > > > > is down for a long time (several hours or a day) we may
> run
> > > out
> > > > > of
> > > > > > > > space
> > > > > > > > > to
> > > > > > > > > > spool for buffer server since it spools to local disk,
> and
> > > data
> > > > > is
> > > > > > > > purged
> > > > > > > > > > only after a window is committed. Furthermore this buffer
> > > > server
> > > > > > > > problem
> > > > > > > > > > will exist for all the Streaming Containers in the dag,
> not
> > > > just
> > > > > > the
> > > > > > > > one
> > > > > > > > > > immediately upstream from the output operator, since data
> > is
> > > > > > spooled
> > > > > > > to
> > > > > > > > > > disk for all operators and only removed for windows once
> a
> > > > window
> > > > > > is
> > > > > > > > > > committed.
> > > > > > > > > >
> > > > > > > > > > 2 - If there is another failure further upstream in the
> > dag,
> > > > > > upstream
> > > > > > > > > > operators will be redeployed to a checkpoint less than or
> > > equal
> > > > > to
> > > > > > > the
> > > > > > > > > > checkpoint of the database operator in the At leas once
> > case.
> > > > > This
> > > > > > > > could
> > > > > > > > > > mean redoing several hours or a day worth of computation.
> > > > > > > > > >
> > > > > > > > > > We should support a mechanism to detect when the
> connection
> > > to
> > > > a
> > > > > > > > database
> > > > > > > > > > is lost and then spool to hdfs using a WAL, and then
> write
> > > the
> > > > > > > contents
> > > > > > > > > of
> > > > > > > > > > the WAL into the database once it comes back online. This
> > > will
> > > > > save
> > > > > > > the
> > > > > > > > > > local disk space of all the nodes used in the dag and
> allow
> > > it
> > > > to
> > > > > > be
> > > > > > > > used
> > > > > > > > > > for only the data being output to the output operator.
> > > > > > > > > >
> > > > > > > > > > Ticket here if anyone is interested in working on it:
> > > > > > > > > >
> > > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Tim
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Regards,
> > > > > > Ashwin.
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Regards,
> > > > Ashwin.
> > > >
> > >
> >
>
>
>
> --
>
> Regards,
> Ashwin.
>

Re: Database Output Operator Improvements

Posted by Ashwin Chandra Putta <as...@gmail.com>.
I will send a PR for my first implementation soon.

On Thu, Dec 17, 2015 at 2:34 PM, Timothy Farkas <ti...@datatorrent.com> wrote:

> It looks like Ashwin has an initial implementation of a reconciler. Could
> we add that to Malhar and add WAL optimizations to it once the WAL is added
> to Malhar?
>
> On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Pramod,
> >
> > Agreed it can be done using the reconciler and optimizing it but that
> means
> > there is some work to be done in Malhar/library. We have a ticket now to
> > address that work.
> >
> > Using WAL to spool the tuples is all missing from Malhar/lib which means
> > the user needs to write more code.
> >
> > Thanks,
> > Chandni
> >
> > On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta <
> > ashwinchandrap@gmail.com> wrote:
> >
> > > Tim,
> > >
> > > I don't think there is an implementation in Malhar yet. I have an
> > > implementation in my fork that I sent you.
> > >
> > > Regards,
> > > Ashwin.
> > >
> > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <ti...@datatorrent.com>
> > > wrote:
> > >
> > > > Ashwin is there an implementation of that in Malhar? I could only
> find
> > an
> > > > in memory only version:
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
> > > >
> > > > This in memory implementation won't work in this use case since
> > committed
> > > > may not be called for hours or a day so data will be held in memory
> for
> > > > some time.
> > > >
> > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> > > > ashwinchandrap@gmail.com> wrote:
> > > >
> > > > > Tim,
> > > > >
> > > > > Are you saying HDFS is slower than a database? :)
> > > > >
> > > > > I think Reconciler is the best approach. The tuples need not be
> > written
> > > > to
> > > > > hdfs, they can be queued in memory. You can spool them to hdfs only
> > > when
> > > > it
> > > > > reaches the limits of the queue. The reconciler solves a few major
> > > > problems
> > > > > as you described above.
> > > > >
> > > > > 1. Graceful reconnection. When the external system we are writing
> to
> > is
> > > > > down, the reconciler is spooling the messages to the queue and then
> > to
> > > > > hdfs. The tuples are written to the external system only after it
> is
> > > back
> > > > > up again.
> > > > > 2. Handling surges. There will be cases when the throughput may
> get a
> > > > > sudden surge for some period and the external system may not be
> fast
> > > > enough
> > > > > for the writes to it. In those cases, by using reconciler, we are
> > > > spooling
> > > > > the incoming tuples to queue/hdfs and then writing at the pace of
> > > > external
> > > > > system.
> > > > > 3. Dag slowdown. Again in case of external system failure or slow
> > > > > connection, we do not want to block the windows moving forward. If
> > the
> > > > > windows are blocked for a long time, then stram will unnecessarily
> > kill
> > > > the
> > > > > operator. Reconciler makes sure that the incoming messages are just
> > > > > queued/spooled to hdfs (external system is not blocking the dag),
> so
> > > the
> > > > > dag is not slowed down.
> > > > >
> > > > > Regards,
> > > > > Ashwin.
> > > > >
> > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <
> > tim@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Yes that is true Chandni, and considering how slow HDFS is we
> > should
> > > > > avoid
> > > > > > writing to it if we can.
> > > > > >
> > > > > > It would be great if someone could pick up the ticket :).
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> > > > chandni@datatorrent.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > +1 for Tim's suggestion.
> > > > > > >
> > > > > > > Using reconciler employs always writing to HDFS and then read
> > from
> > > > > that.
> > > > > > > Tim's suggestion is that we only write to hdfs when database
> > > > connection
> > > > > > is
> > > > > > > down. This is analogous to spooling.
> > > > > > >
> > > > > > > Chandni
> > > > > > >
> > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > > > > pramod@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Tim we have a pattern for this called Reconciler that Gaurav
> > has
> > > > also
> > > > > > > > mentioned. There are some examples for it in Malhar
> > > > > > > >
> > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> > > > tim@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > One of our users is outputting to Cassandra, but they want
> to
> > > > > handle
> > > > > > a
> > > > > > > > > Cassandra failure or Cassandra down time gracefully from an
> > > > output
> > > > > > > > > operator. Currently a lot of our database operators will
> just
> > > > fail
> > > > > > and
> > > > > > > > > redeploy continually until the database comes back. This
> is a
> > > bad
> > > > > > idea
> > > > > > > > for
> > > > > > > > > a couple of reasons:
> > > > > > > > >
> > > > > > > > > 1 - We rely on buffer server spooling to prevent data loss.
> > If
> > > > the
> > > > > > > > database
> > > > > > > > > is down for a long time (several hours or a day) we may run
> > out
> > > > of
> > > > > > > space
> > > > > > > > to
> > > > > > > > > spool for buffer server since it spools to local disk, and
> > data
> > > > is
> > > > > > > purged
> > > > > > > > > only after a window is committed. Furthermore this buffer
> > > server
> > > > > > > problem
> > > > > > > > > will exist for all the Streaming Containers in the dag, not
> > > just
> > > > > the
> > > > > > > one
> > > > > > > > > immediately upstream from the output operator, since data
> is
> > > > > spooled
> > > > > > to
> > > > > > > > > disk for all operators and only removed for windows once a
> > > window
> > > > > is
> > > > > > > > > committed.
> > > > > > > > >
> > > > > > > > > 2 - If there is another failure further upstream in the
> dag,
> > > > > upstream
> > > > > > > > > operators will be redeployed to a checkpoint less than or
> > equal
> > > > to
> > > > > > the
> > > > > > > > > checkpoint of the database operator in the At leas once
> case.
> > > > This
> > > > > > > could
> > > > > > > > > mean redoing several hours or a day worth of computation.
> > > > > > > > >
> > > > > > > > > We should support a mechanism to detect when the connection
> > to
> > > a
> > > > > > > database
> > > > > > > > > is lost and then spool to hdfs using a WAL, and then write
> > the
> > > > > > contents
> > > > > > > > of
> > > > > > > > > the WAL into the database once it comes back online. This
> > will
> > > > save
> > > > > > the
> > > > > > > > > local disk space of all the nodes used in the dag and allow
> > it
> > > to
> > > > > be
> > > > > > > used
> > > > > > > > > for only the data being output to the output operator.
> > > > > > > > >
> > > > > > > > > Ticket here if anyone is interested in working on it:
> > > > > > > > >
> > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Tim
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Regards,
> > > > > Ashwin.
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > Regards,
> > > Ashwin.
> > >
> >
>



-- 

Regards,
Ashwin.

Re: Database Output Operator Improvements

Posted by Timothy Farkas <ti...@datatorrent.com>.
It looks like Ashwin has an initial implementation of a reconciler. Could
we add that to Malhar and add WAL optimizations to it once the WAL is added
to Malhar?

On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Pramod,
>
> Agreed it can be done using the reconciler and optimizing it but that means
> there is some work to be done in Malhar/library. We have a ticket now to
> address that work.
>
> Using WAL to spool the tuples is all missing from Malhar/lib which means
> the user needs to write more code.
>
> Thanks,
> Chandni
>
> On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta <
> ashwinchandrap@gmail.com> wrote:
>
> > Tim,
> >
> > I don't think there is an implementation in Malhar yet. I have an
> > implementation in my fork that I sent you.
> >
> > Regards,
> > Ashwin.
> >
> > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <ti...@datatorrent.com>
> > wrote:
> >
> > > Ashwin is there an implementation of that in Malhar? I could only find
> an
> > > in memory only version:
> > >
> > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
> > >
> > > This in memory implementation won't work in this use case since
> committed
> > > may not be called for hours or a day so data will be held in memory for
> > > some time.
> > >
> > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> > > ashwinchandrap@gmail.com> wrote:
> > >
> > > > Tim,
> > > >
> > > > Are you saying HDFS is slower than a database? :)
> > > >
> > > > I think Reconciler is the best approach. The tuples need not be
> written
> > > to
> > > > hdfs, they can be queued in memory. You can spool them to hdfs only
> > when
> > > it
> > > > reaches the limits of the queue. The reconciler solves a few major
> > > problems
> > > > as you described above.
> > > >
> > > > 1. Graceful reconnection. When the external system we are writing to
> is
> > > > down, the reconciler is spooling the messages to the queue and then
> to
> > > > hdfs. The tuples are written to the external system only after it is
> > back
> > > > up again.
> > > > 2. Handling surges. There will be cases when the throughput may get a
> > > > sudden surge for some period and the external system may not be fast
> > > enough
> > > > for the writes to it. In those cases, by using reconciler, we are
> > > spooling
> > > > the incoming tuples to queue/hdfs and then writing at the pace of
> > > external
> > > > system.
> > > > 3. Dag slowdown. Again in case of external system failure or slow
> > > > connection, we do not want to block the windows moving forward. If
> the
> > > > windows are blocked for a long time, then stram will unnecessarily
> kill
> > > the
> > > > operator. Reconciler makes sure that the incoming messages are just
> > > > queued/spooled to hdfs (external system is not blocking the dag), so
> > the
> > > > dag is not slowed down.
> > > >
> > > > Regards,
> > > > Ashwin.
> > > >
> > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <
> tim@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Yes that is true Chandni, and considering how slow HDFS is we
> should
> > > > avoid
> > > > > writing to it if we can.
> > > > >
> > > > > It would be great if someone could pick up the ticket :).
> > > > >
> > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> > > chandni@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 for Tim's suggestion.
> > > > > >
> > > > > > Using reconciler employs always writing to HDFS and then read
> from
> > > > that.
> > > > > > Tim's suggestion is that we only write to hdfs when database
> > > connection
> > > > > is
> > > > > > down. This is analogous to spooling.
> > > > > >
> > > > > > Chandni
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > > > pramod@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Tim we have a pattern for this called Reconciler that Gaurav
> has
> > > also
> > > > > > > mentioned. There are some examples for it in Malhar
> > > > > > >
> > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> > > tim@datatorrent.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > One of our users is outputting to Cassandra, but they want to
> > > > handle
> > > > > a
> > > > > > > > Cassandra failure or Cassandra down time gracefully from an
> > > output
> > > > > > > > operator. Currently a lot of our database operators will just
> > > fail
> > > > > and
> > > > > > > > redeploy continually until the database comes back. This is a
> > bad
> > > > > idea
> > > > > > > for
> > > > > > > > a couple of reasons:
> > > > > > > >
> > > > > > > > 1 - We rely on buffer server spooling to prevent data loss.
> If
> > > the
> > > > > > > database
> > > > > > > > is down for a long time (several hours or a day) we may run
> out
> > > of
> > > > > > space
> > > > > > > to
> > > > > > > > spool for buffer server since it spools to local disk, and
> data
> > > is
> > > > > > purged
> > > > > > > > only after a window is committed. Furthermore this buffer
> > server
> > > > > > problem
> > > > > > > > will exist for all the Streaming Containers in the dag, not
> > just
> > > > the
> > > > > > one
> > > > > > > > immediately upstream from the output operator, since data is
> > > > spooled
> > > > > to
> > > > > > > > disk for all operators and only removed for windows once a
> > window
> > > > is
> > > > > > > > committed.
> > > > > > > >
> > > > > > > > 2 - If there is another failure further upstream in the dag,
> > > > upstream
> > > > > > > > operators will be redeployed to a checkpoint less than or
> equal
> > > to
> > > > > the
> > > > > > > > checkpoint of the database operator in the At leas once case.
> > > This
> > > > > > could
> > > > > > > > mean redoing several hours or a day worth of computation.
> > > > > > > >
> > > > > > > > We should support a mechanism to detect when the connection
> to
> > a
> > > > > > database
> > > > > > > > is lost and then spool to hdfs using a WAL, and then write
> the
> > > > > contents
> > > > > > > of
> > > > > > > > the WAL into the database once it comes back online. This
> will
> > > save
> > > > > the
> > > > > > > > local disk space of all the nodes used in the dag and allow
> it
> > to
> > > > be
> > > > > > used
> > > > > > > > for only the data being output to the output operator.
> > > > > > > >
> > > > > > > > Ticket here if anyone is interested in working on it:
> > > > > > > >
> > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Tim
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Regards,
> > > > Ashwin.
> > > >
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Ashwin.
> >
>

Re: Database Output Operator Improvements

Posted by Chandni Singh <ch...@datatorrent.com>.
Pramod,

Agreed it can be done using the reconciler and optimizing it but that means
there is some work to be done in Malhar/library. We have a ticket now to
address that work.

Using WAL to spool the tuples is all missing from Malhar/lib which means
the user needs to write more code.

Thanks,
Chandni

On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta <
ashwinchandrap@gmail.com> wrote:

> Tim,
>
> I don't think there is an implementation in Malhar yet. I have an
> implementation in my fork that I sent you.
>
> Regards,
> Ashwin.
>
> On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Ashwin is there an implementation of that in Malhar? I could only find an
> > in memory only version:
> >
> >
> >
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
> >
> > This in memory implementation won't work in this use case since committed
> > may not be called for hours or a day so data will be held in memory for
> > some time.
> >
> > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> > ashwinchandrap@gmail.com> wrote:
> >
> > > Tim,
> > >
> > > Are you saying HDFS is slower than a database? :)
> > >
> > > I think Reconciler is the best approach. The tuples need not be written
> > to
> > > hdfs, they can be queued in memory. You can spool them to hdfs only
> when
> > it
> > > reaches the limits of the queue. The reconciler solves a few major
> > problems
> > > as you described above.
> > >
> > > 1. Graceful reconnection. When the external system we are writing to is
> > > down, the reconciler is spooling the messages to the queue and then to
> > > hdfs. The tuples are written to the external system only after it is
> back
> > > up again.
> > > 2. Handling surges. There will be cases when the throughput may get a
> > > sudden surge for some period and the external system may not be fast
> > enough
> > > for the writes to it. In those cases, by using reconciler, we are
> > spooling
> > > the incoming tuples to queue/hdfs and then writing at the pace of
> > external
> > > system.
> > > 3. Dag slowdown. Again in case of external system failure or slow
> > > connection, we do not want to block the windows moving forward. If the
> > > windows are blocked for a long time, then stram will unnecessarily kill
> > the
> > > operator. Reconciler makes sure that the incoming messages are just
> > > queued/spooled to hdfs (external system is not blocking the dag), so
> the
> > > dag is not slowed down.
> > >
> > > Regards,
> > > Ashwin.
> > >
> > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
> > > wrote:
> > >
> > > > Yes that is true Chandni, and considering how slow HDFS is we should
> > > avoid
> > > > writing to it if we can.
> > > >
> > > > It would be great if someone could pick up the ticket :).
> > > >
> > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> > chandni@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > +1 for Tim's suggestion.
> > > > >
> > > > > Using reconciler employs always writing to HDFS and then read from
> > > that.
> > > > > Tim's suggestion is that we only write to hdfs when database
> > connection
> > > > is
> > > > > down. This is analogous to spooling.
> > > > >
> > > > > Chandni
> > > > >
> > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > > pramod@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Tim we have a pattern for this called Reconciler that Gaurav has
> > also
> > > > > > mentioned. There are some examples for it in Malhar
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> > tim@datatorrent.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > One of our users is outputting to Cassandra, but they want to
> > > handle
> > > > a
> > > > > > > Cassandra failure or Cassandra down time gracefully from an
> > output
> > > > > > > operator. Currently a lot of our database operators will just
> > fail
> > > > and
> > > > > > > redeploy continually until the database comes back. This is a
> bad
> > > > idea
> > > > > > for
> > > > > > > a couple of reasons:
> > > > > > >
> > > > > > > 1 - We rely on buffer server spooling to prevent data loss. If
> > the
> > > > > > database
> > > > > > > is down for a long time (several hours or a day) we may run out
> > of
> > > > > space
> > > > > > to
> > > > > > > spool for buffer server since it spools to local disk, and data
> > is
> > > > > purged
> > > > > > > only after a window is committed. Furthermore this buffer
> server
> > > > > problem
> > > > > > > will exist for all the Streaming Containers in the dag, not
> just
> > > the
> > > > > one
> > > > > > > immediately upstream from the output operator, since data is
> > > spooled
> > > > to
> > > > > > > disk for all operators and only removed for windows once a
> window
> > > is
> > > > > > > committed.
> > > > > > >
> > > > > > > 2 - If there is another failure further upstream in the dag,
> > > upstream
> > > > > > > operators will be redeployed to a checkpoint less than or equal
> > to
> > > > the
> > > > > > > checkpoint of the database operator in the At leas once case.
> > This
> > > > > could
> > > > > > > mean redoing several hours or a day worth of computation.
> > > > > > >
> > > > > > > We should support a mechanism to detect when the connection to
> a
> > > > > database
> > > > > > > is lost and then spool to hdfs using a WAL, and then write the
> > > > contents
> > > > > > of
> > > > > > > the WAL into the database once it comes back online. This will
> > save
> > > > the
> > > > > > > local disk space of all the nodes used in the dag and allow it
> to
> > > be
> > > > > used
> > > > > > > for only the data being output to the output operator.
> > > > > > >
> > > > > > > Ticket here if anyone is interested in working on it:
> > > > > > >
> > > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Tim
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > Regards,
> > > Ashwin.
> > >
> >
>
>
>
> --
>
> Regards,
> Ashwin.
>

Re: Database Output Operator Improvements

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Tim,

I don't think there is an implementation in Malhar yet. I have an
implementation in my fork that I sent you.

Regards,
Ashwin.

On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Ashwin is there an implementation of that in Malhar? I could only find an
> in memory only version:
>
>
> https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
>
> This in memory implementation won't work in this use case since committed
> may not be called for hours or a day so data will be held in memory for
> some time.
>
> On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> ashwinchandrap@gmail.com> wrote:
>
> > Tim,
> >
> > Are you saying HDFS is slower than a database? :)
> >
> > I think Reconciler is the best approach. The tuples need not be written
> to
> > hdfs, they can be queued in memory. You can spool them to hdfs only when
> it
> > reaches the limits of the queue. The reconciler solves a few major
> problems
> > as you described above.
> >
> > 1. Graceful reconnection. When the external system we are writing to is
> > down, the reconciler is spooling the messages to the queue and then to
> > hdfs. The tuples are written to the external system only after it is back
> > up again.
> > 2. Handling surges. There will be cases when the throughput may get a
> > sudden surge for some period and the external system may not be fast
> enough
> > for the writes to it. In those cases, by using reconciler, we are
> spooling
> > the incoming tuples to queue/hdfs and then writing at the pace of
> external
> > system.
> > 3. Dag slowdown. Again in case of external system failure or slow
> > connection, we do not want to block the windows moving forward. If the
> > windows are blocked for a long time, then stram will unnecessarily kill
> the
> > operator. Reconciler makes sure that the incoming messages are just
> > queued/spooled to hdfs (external system is not blocking the dag), so the
> > dag is not slowed down.
> >
> > Regards,
> > Ashwin.
> >
> > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
> > wrote:
> >
> > > Yes that is true Chandni, and considering how slow HDFS is we should
> > avoid
> > > writing to it if we can.
> > >
> > > It would be great if someone could pick up the ticket :).
> > >
> > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <
> chandni@datatorrent.com
> > >
> > > wrote:
> > >
> > > > +1 for Tim's suggestion.
> > > >
> > > > Using reconciler employs always writing to HDFS and then read from
> > that.
> > > > Tim's suggestion is that we only write to hdfs when database
> connection
> > > is
> > > > down. This is analogous to spooling.
> > > >
> > > > Chandni
> > > >
> > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > > pramod@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Tim we have a pattern for this called Reconciler that Gaurav has
> also
> > > > > mentioned. There are some examples for it in Malhar
> > > > >
> > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <
> tim@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > One of our users is outputting to Cassandra, but they want to
> > handle
> > > a
> > > > > > Cassandra failure or Cassandra down time gracefully from an
> output
> > > > > > operator. Currently a lot of our database operators will just
> fail
> > > and
> > > > > > redeploy continually until the database comes back. This is a bad
> > > idea
> > > > > for
> > > > > > a couple of reasons:
> > > > > >
> > > > > > 1 - We rely on buffer server spooling to prevent data loss. If
> the
> > > > > database
> > > > > > is down for a long time (several hours or a day) we may run out
> of
> > > > space
> > > > > to
> > > > > > spool for buffer server since it spools to local disk, and data
> is
> > > > purged
> > > > > > only after a window is committed. Furthermore this buffer server
> > > > problem
> > > > > > will exist for all the Streaming Containers in the dag, not just
> > the
> > > > one
> > > > > > immediately upstream from the output operator, since data is
> > spooled
> > > to
> > > > > > disk for all operators and only removed for windows once a window
> > is
> > > > > > committed.
> > > > > >
> > > > > > 2 - If there is another failure further upstream in the dag,
> > upstream
> > > > > > operators will be redeployed to a checkpoint less than or equal
> to
> > > the
> > > > > > checkpoint of the database operator in the At leas once case.
> This
> > > > could
> > > > > > mean redoing several hours or a day worth of computation.
> > > > > >
> > > > > > We should support a mechanism to detect when the connection to a
> > > > database
> > > > > > is lost and then spool to hdfs using a WAL, and then write the
> > > contents
> > > > > of
> > > > > > the WAL into the database once it comes back online. This will
> save
> > > the
> > > > > > local disk space of all the nodes used in the dag and allow it to
> > be
> > > > used
> > > > > > for only the data being output to the output operator.
> > > > > >
> > > > > > Ticket here if anyone is interested in working on it:
> > > > > >
> > > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > > >
> > > > > > Thanks,
> > > > > > Tim
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Ashwin.
> >
>



-- 

Regards,
Ashwin.

Re: Database Output Operator Improvements

Posted by Timothy Farkas <ti...@datatorrent.com>.
Ashwin is there an implementation of that in Malhar? I could only find an
in memory only version:

https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java

This in memory implementation won't work in this use case since committed
may not be called for hours or a day so data will be held in memory for
some time.

On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
ashwinchandrap@gmail.com> wrote:

> Tim,
>
> Are you saying HDFS is slower than a database? :)
>
> I think Reconciler is the best approach. The tuples need not be written to
> hdfs, they can be queued in memory. You can spool them to hdfs only when it
> reaches the limits of the queue. The reconciler solves a few major problems
> as you described above.
>
> 1. Graceful reconnection. When the external system we are writing to is
> down, the reconciler is spooling the messages to the queue and then to
> hdfs. The tuples are written to the external system only after it is back
> up again.
> 2. Handling surges. There will be cases when the throughput may get a
> sudden surge for some period and the external system may not be fast enough
> for the writes to it. In those cases, by using reconciler, we are spooling
> the incoming tuples to queue/hdfs and then writing at the pace of external
> system.
> 3. Dag slowdown. Again in case of external system failure or slow
> connection, we do not want to block the windows moving forward. If the
> windows are blocked for a long time, then stram will unnecessarily kill the
> operator. Reconciler makes sure that the incoming messages are just
> queued/spooled to hdfs (external system is not blocking the dag), so the
> dag is not slowed down.
>
> Regards,
> Ashwin.
>
> On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Yes that is true Chandni, and considering how slow HDFS is we should
> avoid
> > writing to it if we can.
> >
> > It would be great if someone could pick up the ticket :).
> >
> > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> > > +1 for Tim's suggestion.
> > >
> > > Using reconciler employs always writing to HDFS and then read from
> that.
> > > Tim's suggestion is that we only write to hdfs when database connection
> > is
> > > down. This is analogous to spooling.
> > >
> > > Chandni
> > >
> > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > wrote:
> > >
> > > > Tim we have a pattern for this called Reconciler that Gaurav has also
> > > > mentioned. There are some examples for it in Malhar
> > > >
> > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <tim@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > One of our users is outputting to Cassandra, but they want to
> handle
> > a
> > > > > Cassandra failure or Cassandra down time gracefully from an output
> > > > > operator. Currently a lot of our database operators will just fail
> > and
> > > > > redeploy continually until the database comes back. This is a bad
> > idea
> > > > for
> > > > > a couple of reasons:
> > > > >
> > > > > 1 - We rely on buffer server spooling to prevent data loss. If the
> > > > database
> > > > > is down for a long time (several hours or a day) we may run out of
> > > space
> > > > to
> > > > > spool for buffer server since it spools to local disk, and data is
> > > purged
> > > > > only after a window is committed. Furthermore this buffer server
> > > problem
> > > > > will exist for all the Streaming Containers in the dag, not just
> the
> > > one
> > > > > immediately upstream from the output operator, since data is
> spooled
> > to
> > > > > disk for all operators and only removed for windows once a window
> is
> > > > > committed.
> > > > >
> > > > > 2 - If there is another failure further upstream in the dag,
> upstream
> > > > > operators will be redeployed to a checkpoint less than or equal to
> > the
> > > > > checkpoint of the database operator in the At leas once case. This
> > > could
> > > > > mean redoing several hours or a day worth of computation.
> > > > >
> > > > > We should support a mechanism to detect when the connection to a
> > > database
> > > > > is lost and then spool to hdfs using a WAL, and then write the
> > contents
> > > > of
> > > > > the WAL into the database once it comes back online. This will save
> > the
> > > > > local disk space of all the nodes used in the dag and allow it to
> be
> > > used
> > > > > for only the data being output to the output operator.
> > > > >
> > > > > Ticket here if anyone is interested in working on it:
> > > > >
> > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > >
> > > > > Thanks,
> > > > > Tim
> > > > >
> > > >
> > >
> >
>
>
>
> --
>
> Regards,
> Ashwin.
>

Re: Database Output Operator Improvements

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Why can't the reconciler as it exists today be enhanced to write more optimally.

> On Dec 17, 2015, at 12:07 PM, Chandni Singh <ch...@datatorrent.com> wrote:
>
> The question is with databases like HBase & Cassandra which are again
> backed by a FileSystem like HDFS why to write to HDFS when the database
> connection is healthy?
>
> These are distributed, scalable and performant databases.
>
> IMO reconciler approach isn't the best here. It fits the needs when the
> external entity is always slow which was the original use case.
> We can spool to HDFS when the connection is unhealthy.
>
> If this is properly implemented it can address all the other points which
> are mentioned by Ashwin.
>
> Also I think benchmarking of such solutions will help us in comparing and
> deciding which use case they fit best.
>
> Chandni
>
> On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
> ashwinchandrap@gmail.com> wrote:
>
>> Tim,
>>
>> Are you saying HDFS is slower than a database? :)
>>
>> I think Reconciler is the best approach. The tuples need not be written to
>> hdfs, they can be queued in memory. You can spool them to hdfs only when it
>> reaches the limits of the queue. The reconciler solves a few major problems
>> as you described above.
>>
>> 1. Graceful reconnection. When the external system we are writing to is
>> down, the reconciler is spooling the messages to the queue and then to
>> hdfs. The tuples are written to the external system only after it is back
>> up again.
>> 2. Handling surges. There will be cases when the throughput may get a
>> sudden surge for some period and the external system may not be fast enough
>> for the writes to it. In those cases, by using reconciler, we are spooling
>> the incoming tuples to queue/hdfs and then writing at the pace of external
>> system.
>> 3. Dag slowdown. Again in case of external system failure or slow
>> connection, we do not want to block the windows moving forward. If the
>> windows are blocked for a long time, then stram will unnecessarily kill the
>> operator. Reconciler makes sure that the incoming messages are just
>> queued/spooled to hdfs (external system is not blocking the dag), so the
>> dag is not slowed down.
>>
>> Regards,
>> Ashwin.
>>
>> On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
>> wrote:
>>
>>> Yes that is true Chandni, and considering how slow HDFS is we should
>> avoid
>>> writing to it if we can.
>>>
>>> It would be great if someone could pick up the ticket :).
>>>
>>> On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <chandni@datatorrent.com
>>>
>>> wrote:
>>>
>>>> +1 for Tim's suggestion.
>>>>
>>>> Using reconciler employs always writing to HDFS and then read from
>> that.
>>>> Tim's suggestion is that we only write to hdfs when database connection
>>> is
>>>> down. This is analogous to spooling.
>>>>
>>>> Chandni
>>>>
>>>> On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
>>> pramod@datatorrent.com>
>>>> wrote:
>>>>
>>>>> Tim we have a pattern for this called Reconciler that Gaurav has also
>>>>> mentioned. There are some examples for it in Malhar
>>>>>
>>>>> On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <tim@datatorrent.com
>>>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> One of our users is outputting to Cassandra, but they want to
>> handle
>>> a
>>>>>> Cassandra failure or Cassandra down time gracefully from an output
>>>>>> operator. Currently a lot of our database operators will just fail
>>> and
>>>>>> redeploy continually until the database comes back. This is a bad
>>> idea
>>>>> for
>>>>>> a couple of reasons:
>>>>>>
>>>>>> 1 - We rely on buffer server spooling to prevent data loss. If the
>>>>> database
>>>>>> is down for a long time (several hours or a day) we may run out of
>>>> space
>>>>> to
>>>>>> spool for buffer server since it spools to local disk, and data is
>>>> purged
>>>>>> only after a window is committed. Furthermore this buffer server
>>>> problem
>>>>>> will exist for all the Streaming Containers in the dag, not just
>> the
>>>> one
>>>>>> immediately upstream from the output operator, since data is
>> spooled
>>> to
>>>>>> disk for all operators and only removed for windows once a window
>> is
>>>>>> committed.
>>>>>>
>>>>>> 2 - If there is another failure further upstream in the dag,
>> upstream
>>>>>> operators will be redeployed to a checkpoint less than or equal to
>>> the
>>>>>> checkpoint of the database operator in the At leas once case. This
>>>> could
>>>>>> mean redoing several hours or a day worth of computation.
>>>>>>
>>>>>> We should support a mechanism to detect when the connection to a
>>>> database
>>>>>> is lost and then spool to hdfs using a WAL, and then write the
>>> contents
>>>>> of
>>>>>> the WAL into the database once it comes back online. This will save
>>> the
>>>>>> local disk space of all the nodes used in the dag and allow it to
>> be
>>>> used
>>>>>> for only the data being output to the output operator.
>>>>>>
>>>>>> Ticket here if anyone is interested in working on it:
>>>>>>
>>>>>> https://malhar.atlassian.net/browse/MLHR-1951
>>>>>>
>>>>>> Thanks,
>>>>>> Tim
>>
>>
>>
>> --
>>
>> Regards,
>> Ashwin.
>>

Re: Database Output Operator Improvements

Posted by Chandni Singh <ch...@datatorrent.com>.
The question is with databases like HBase & Cassandra which are again
backed by a FileSystem like HDFS why to write to HDFS when the database
connection is healthy?

These are distributed, scalable and performant databases.

IMO reconciler approach isn't the best here. It fits the needs when the
external entity is always slow which was the original use case.
We can spool to HDFS when the connection is unhealthy.

If this is properly implemented it can address all the other points which
are mentioned by Ashwin.

Also I think benchmarking of such solutions will help us in comparing and
deciding which use case they fit best.

Chandni

On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta <
ashwinchandrap@gmail.com> wrote:

> Tim,
>
> Are you saying HDFS is slower than a database? :)
>
> I think Reconciler is the best approach. The tuples need not be written to
> hdfs, they can be queued in memory. You can spool them to hdfs only when it
> reaches the limits of the queue. The reconciler solves a few major problems
> as you described above.
>
> 1. Graceful reconnection. When the external system we are writing to is
> down, the reconciler is spooling the messages to the queue and then to
> hdfs. The tuples are written to the external system only after it is back
> up again.
> 2. Handling surges. There will be cases when the throughput may get a
> sudden surge for some period and the external system may not be fast enough
> for the writes to it. In those cases, by using reconciler, we are spooling
> the incoming tuples to queue/hdfs and then writing at the pace of external
> system.
> 3. Dag slowdown. Again in case of external system failure or slow
> connection, we do not want to block the windows moving forward. If the
> windows are blocked for a long time, then stram will unnecessarily kill the
> operator. Reconciler makes sure that the incoming messages are just
> queued/spooled to hdfs (external system is not blocking the dag), so the
> dag is not slowed down.
>
> Regards,
> Ashwin.
>
> On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Yes that is true Chandni, and considering how slow HDFS is we should
> avoid
> > writing to it if we can.
> >
> > It would be great if someone could pick up the ticket :).
> >
> > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> > > +1 for Tim's suggestion.
> > >
> > > Using reconciler employs always writing to HDFS and then read from
> that.
> > > Tim's suggestion is that we only write to hdfs when database connection
> > is
> > > down. This is analogous to spooling.
> > >
> > > Chandni
> > >
> > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > wrote:
> > >
> > > > Tim we have a pattern for this called Reconciler that Gaurav has also
> > > > mentioned. There are some examples for it in Malhar
> > > >
> > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <tim@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > One of our users is outputting to Cassandra, but they want to
> handle
> > a
> > > > > Cassandra failure or Cassandra down time gracefully from an output
> > > > > operator. Currently a lot of our database operators will just fail
> > and
> > > > > redeploy continually until the database comes back. This is a bad
> > idea
> > > > for
> > > > > a couple of reasons:
> > > > >
> > > > > 1 - We rely on buffer server spooling to prevent data loss. If the
> > > > database
> > > > > is down for a long time (several hours or a day) we may run out of
> > > space
> > > > to
> > > > > spool for buffer server since it spools to local disk, and data is
> > > purged
> > > > > only after a window is committed. Furthermore this buffer server
> > > problem
> > > > > will exist for all the Streaming Containers in the dag, not just
> the
> > > one
> > > > > immediately upstream from the output operator, since data is
> spooled
> > to
> > > > > disk for all operators and only removed for windows once a window
> is
> > > > > committed.
> > > > >
> > > > > 2 - If there is another failure further upstream in the dag,
> upstream
> > > > > operators will be redeployed to a checkpoint less than or equal to
> > the
> > > > > checkpoint of the database operator in the At leas once case. This
> > > could
> > > > > mean redoing several hours or a day worth of computation.
> > > > >
> > > > > We should support a mechanism to detect when the connection to a
> > > database
> > > > > is lost and then spool to hdfs using a WAL, and then write the
> > contents
> > > > of
> > > > > the WAL into the database once it comes back online. This will save
> > the
> > > > > local disk space of all the nodes used in the dag and allow it to
> be
> > > used
> > > > > for only the data being output to the output operator.
> > > > >
> > > > > Ticket here if anyone is interested in working on it:
> > > > >
> > > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > > >
> > > > > Thanks,
> > > > > Tim
> > > > >
> > > >
> > >
> >
>
>
>
> --
>
> Regards,
> Ashwin.
>

Re: Database Output Operator Improvements

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Tim,

Are you saying HDFS is slower than a database? :)

I think Reconciler is the best approach. The tuples need not be written to
hdfs, they can be queued in memory. You can spool them to hdfs only when it
reaches the limits of the queue. The reconciler solves a few major problems
as you described above.

1. Graceful reconnection. When the external system we are writing to is
down, the reconciler is spooling the messages to the queue and then to
hdfs. The tuples are written to the external system only after it is back
up again.
2. Handling surges. There will be cases when the throughput may get a
sudden surge for some period and the external system may not be fast enough
for the writes to it. In those cases, by using reconciler, we are spooling
the incoming tuples to queue/hdfs and then writing at the pace of external
system.
3. Dag slowdown. Again in case of external system failure or slow
connection, we do not want to block the windows moving forward. If the
windows are blocked for a long time, then stram will unnecessarily kill the
operator. Reconciler makes sure that the incoming messages are just
queued/spooled to hdfs (external system is not blocking the dag), so the
dag is not slowed down.

Regards,
Ashwin.

On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Yes that is true Chandni, and considering how slow HDFS is we should avoid
> writing to it if we can.
>
> It would be great if someone could pick up the ticket :).
>
> On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > +1 for Tim's suggestion.
> >
> > Using reconciler employs always writing to HDFS and then read from that.
> > Tim's suggestion is that we only write to hdfs when database connection
> is
> > down. This is analogous to spooling.
> >
> > Chandni
> >
> > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <
> pramod@datatorrent.com>
> > wrote:
> >
> > > Tim we have a pattern for this called Reconciler that Gaurav has also
> > > mentioned. There are some examples for it in Malhar
> > >
> > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <ti...@datatorrent.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > One of our users is outputting to Cassandra, but they want to handle
> a
> > > > Cassandra failure or Cassandra down time gracefully from an output
> > > > operator. Currently a lot of our database operators will just fail
> and
> > > > redeploy continually until the database comes back. This is a bad
> idea
> > > for
> > > > a couple of reasons:
> > > >
> > > > 1 - We rely on buffer server spooling to prevent data loss. If the
> > > database
> > > > is down for a long time (several hours or a day) we may run out of
> > space
> > > to
> > > > spool for buffer server since it spools to local disk, and data is
> > purged
> > > > only after a window is committed. Furthermore this buffer server
> > problem
> > > > will exist for all the Streaming Containers in the dag, not just the
> > one
> > > > immediately upstream from the output operator, since data is spooled
> to
> > > > disk for all operators and only removed for windows once a window is
> > > > committed.
> > > >
> > > > 2 - If there is another failure further upstream in the dag, upstream
> > > > operators will be redeployed to a checkpoint less than or equal to
> the
> > > > checkpoint of the database operator in the At leas once case. This
> > could
> > > > mean redoing several hours or a day worth of computation.
> > > >
> > > > We should support a mechanism to detect when the connection to a
> > database
> > > > is lost and then spool to hdfs using a WAL, and then write the
> contents
> > > of
> > > > the WAL into the database once it comes back online. This will save
> the
> > > > local disk space of all the nodes used in the dag and allow it to be
> > used
> > > > for only the data being output to the output operator.
> > > >
> > > > Ticket here if anyone is interested in working on it:
> > > >
> > > > https://malhar.atlassian.net/browse/MLHR-1951
> > > >
> > > > Thanks,
> > > > Tim
> > > >
> > >
> >
>



-- 

Regards,
Ashwin.

Re: Database Output Operator Improvements

Posted by Timothy Farkas <ti...@datatorrent.com>.
Yes that is true Chandni, and considering how slow HDFS is we should avoid
writing to it if we can.

It would be great if someone could pick up the ticket :).

On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> +1 for Tim's suggestion.
>
> Using reconciler employs always writing to HDFS and then read from that.
> Tim's suggestion is that we only write to hdfs when database connection is
> down. This is analogous to spooling.
>
> Chandni
>
> On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > Tim we have a pattern for this called Reconciler that Gaurav has also
> > mentioned. There are some examples for it in Malhar
> >
> > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <ti...@datatorrent.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > One of our users is outputting to Cassandra, but they want to handle a
> > > Cassandra failure or Cassandra down time gracefully from an output
> > > operator. Currently a lot of our database operators will just fail and
> > > redeploy continually until the database comes back. This is a bad idea
> > for
> > > a couple of reasons:
> > >
> > > 1 - We rely on buffer server spooling to prevent data loss. If the
> > database
> > > is down for a long time (several hours or a day) we may run out of
> space
> > to
> > > spool for buffer server since it spools to local disk, and data is
> purged
> > > only after a window is committed. Furthermore this buffer server
> problem
> > > will exist for all the Streaming Containers in the dag, not just the
> one
> > > immediately upstream from the output operator, since data is spooled to
> > > disk for all operators and only removed for windows once a window is
> > > committed.
> > >
> > > 2 - If there is another failure further upstream in the dag, upstream
> > > operators will be redeployed to a checkpoint less than or equal to the
> > > checkpoint of the database operator in the At leas once case. This
> could
> > > mean redoing several hours or a day worth of computation.
> > >
> > > We should support a mechanism to detect when the connection to a
> database
> > > is lost and then spool to hdfs using a WAL, and then write the contents
> > of
> > > the WAL into the database once it comes back online. This will save the
> > > local disk space of all the nodes used in the dag and allow it to be
> used
> > > for only the data being output to the output operator.
> > >
> > > Ticket here if anyone is interested in working on it:
> > >
> > > https://malhar.atlassian.net/browse/MLHR-1951
> > >
> > > Thanks,
> > > Tim
> > >
> >
>

Re: Database Output Operator Improvements

Posted by Chandni Singh <ch...@datatorrent.com>.
+1 for Tim's suggestion.

Using reconciler employs always writing to HDFS and then read from that.
Tim's suggestion is that we only write to hdfs when database connection is
down. This is analogous to spooling.

Chandni

On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Tim we have a pattern for this called Reconciler that Gaurav has also
> mentioned. There are some examples for it in Malhar
>
> On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > One of our users is outputting to Cassandra, but they want to handle a
> > Cassandra failure or Cassandra down time gracefully from an output
> > operator. Currently a lot of our database operators will just fail and
> > redeploy continually until the database comes back. This is a bad idea
> for
> > a couple of reasons:
> >
> > 1 - We rely on buffer server spooling to prevent data loss. If the
> database
> > is down for a long time (several hours or a day) we may run out of space
> to
> > spool for buffer server since it spools to local disk, and data is purged
> > only after a window is committed. Furthermore this buffer server problem
> > will exist for all the Streaming Containers in the dag, not just the one
> > immediately upstream from the output operator, since data is spooled to
> > disk for all operators and only removed for windows once a window is
> > committed.
> >
> > 2 - If there is another failure further upstream in the dag, upstream
> > operators will be redeployed to a checkpoint less than or equal to the
> > checkpoint of the database operator in the At leas once case. This could
> > mean redoing several hours or a day worth of computation.
> >
> > We should support a mechanism to detect when the connection to a database
> > is lost and then spool to hdfs using a WAL, and then write the contents
> of
> > the WAL into the database once it comes back online. This will save the
> > local disk space of all the nodes used in the dag and allow it to be used
> > for only the data being output to the output operator.
> >
> > Ticket here if anyone is interested in working on it:
> >
> > https://malhar.atlassian.net/browse/MLHR-1951
> >
> > Thanks,
> > Tim
> >
>

Re: Database Output Operator Improvements

Posted by Timothy Farkas <ti...@datatorrent.com>.
Thanks guys, I remember now that that was done for Hive. Could someone take
a look at implementing it for Cassandra Output?

On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Tim we have a pattern for this called Reconciler that Gaurav has also
> mentioned. There are some examples for it in Malhar
>
> On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <ti...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > One of our users is outputting to Cassandra, but they want to handle a
> > Cassandra failure or Cassandra down time gracefully from an output
> > operator. Currently a lot of our database operators will just fail and
> > redeploy continually until the database comes back. This is a bad idea
> for
> > a couple of reasons:
> >
> > 1 - We rely on buffer server spooling to prevent data loss. If the
> database
> > is down for a long time (several hours or a day) we may run out of space
> to
> > spool for buffer server since it spools to local disk, and data is purged
> > only after a window is committed. Furthermore this buffer server problem
> > will exist for all the Streaming Containers in the dag, not just the one
> > immediately upstream from the output operator, since data is spooled to
> > disk for all operators and only removed for windows once a window is
> > committed.
> >
> > 2 - If there is another failure further upstream in the dag, upstream
> > operators will be redeployed to a checkpoint less than or equal to the
> > checkpoint of the database operator in the At leas once case. This could
> > mean redoing several hours or a day worth of computation.
> >
> > We should support a mechanism to detect when the connection to a database
> > is lost and then spool to hdfs using a WAL, and then write the contents
> of
> > the WAL into the database once it comes back online. This will save the
> > local disk space of all the nodes used in the dag and allow it to be used
> > for only the data being output to the output operator.
> >
> > Ticket here if anyone is interested in working on it:
> >
> > https://malhar.atlassian.net/browse/MLHR-1951
> >
> > Thanks,
> > Tim
> >
>

Re: Database Output Operator Improvements

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Tim we have a pattern for this called Reconciler that Gaurav has also
mentioned. There are some examples for it in Malhar

On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas <ti...@datatorrent.com> wrote:

> Hi All,
>
> One of our users is outputting to Cassandra, but they want to handle a
> Cassandra failure or Cassandra down time gracefully from an output
> operator. Currently a lot of our database operators will just fail and
> redeploy continually until the database comes back. This is a bad idea for
> a couple of reasons:
>
> 1 - We rely on buffer server spooling to prevent data loss. If the database
> is down for a long time (several hours or a day) we may run out of space to
> spool for buffer server since it spools to local disk, and data is purged
> only after a window is committed. Furthermore this buffer server problem
> will exist for all the Streaming Containers in the dag, not just the one
> immediately upstream from the output operator, since data is spooled to
> disk for all operators and only removed for windows once a window is
> committed.
>
> 2 - If there is another failure further upstream in the dag, upstream
> operators will be redeployed to a checkpoint less than or equal to the
> checkpoint of the database operator in the At leas once case. This could
> mean redoing several hours or a day worth of computation.
>
> We should support a mechanism to detect when the connection to a database
> is lost and then spool to hdfs using a WAL, and then write the contents of
> the WAL into the database once it comes back online. This will save the
> local disk space of all the nodes used in the dag and allow it to be used
> for only the data being output to the output operator.
>
> Ticket here if anyone is interested in working on it:
>
> https://malhar.atlassian.net/browse/MLHR-1951
>
> Thanks,
> Tim
>