You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Priyanka Gugale <pr...@apache.org> on 2015/12/28 09:57:28 UTC

Writing batches to database using Transactionable Store Output operator

Hi,

In Malhar we have an
operator AbstractBatchTransactionableStoreOutputOperator which creates
batches based on tuples received in a window. At the end of the window
these batches are sent to database for processing.
There is no way to configure MAX_SIZE on these batches. Based on input rate
the batch sizes can grow very high, and we might want to restrict batch
size.

Any operator can extend and do batch management on their own, but I see it
as generic requirement and IMO we should change base class i.e.
AbstractBatchTransactionableStoreOutputOperator class to accept MAX_SIZE
for batch from outside.

Any opinion on this?

-Priyanka

Re: Writing batches to database using Transactionable Store Output operator

Posted by Sandeep Deshmukh <sa...@datatorrent.com>.
Chandni: Good point on ordering.

We need to handle the case. Even the ordering is not guaranteed when
upstream operator has multiple instances.

Regards,
Sandeep

On Tue, Dec 29, 2015 at 8:39 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> I agree with Sandeep that at-least once with databases is not the right
> approach. All the output adaptors we have in the library are written so
> that they do not write duplicate entries. For example FileOutputOperator.
> This has nothing to do with the processing mode feature offered by the
> platform. When writing to any external entity we have to ensure that we do
> not introduce duplicates.
>
> Priyanka,
> I am assuming that the batch id you mean here is the number of tuples in
> the app window which are persisted otherwise with every tuple there needs
> to be a batch id so when a window is being replayed you will know which
> tuple to discard.
>
> We have followed that approach in some concrete implementations in POCs.
> One of the reasons that we haven't added that here is that it relies on an
> order of tuples within an application window. This again depends on the
> upstream operator. For eg. in the default mode of dedup, the ordering of
> tuples within an application window is not guaranteed.
>
> Chandni
>
>
>
>
> On Tue, Dec 29, 2015 at 4:53 AM, Priyanka Gugale <priyanka@datatorrent.com
> >
> wrote:
>
> > One more option:
> >
> > We can keep track of windowId & batchId i.e. we save batchId with
> windowId
> > when we commit a batch within a window. e.g. we are in window 10 and we
> > have written second batch in window 10, we commit windowId=10 and
> batchId=2
> > to DB. While recovery we won't process batches within last window which
> are
> > marked as committed.
> >
> > -Priyanka
> >
> > On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <
> sandeep@datatorrent.com
> > >
> > wrote:
> >
> > > Not sure if "At least once" is right behavior for databases. We may not
> > > always have primary key to update or insert.
> > >
> > >
> > > Regards,
> > > Sandeep
> > >
> > > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <pr...@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your inputs Chandni. I guess what you are suggesting is
> > > similar
> > > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch
> non
> > > > transactional operation. That is one of the good option.
> > > >
> > > > I am also thinking of a possibility of having "At least once"
> behavior
> > > with
> > > > Transactional store. In this, we keep committing batches within a
> > window.
> > > > Each batch commit will be  a transaction. On recovery we start
> > processing
> > > > from last committed window (don't exclude last committed window, as
> it
> > > > could be partially written). If the queries are update or insert
> > queries
> > > > using primary key, it shouldn't  be a problem if we reply
> insert/update
> > > > command. It will have same effect on database (of course not
> applicable
> > > for
> > > > all usecases). Does this look better?
> > > >
> > > > -Priyanka
> > > >
> > > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <
> > chandni@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Yeah I understand there is a problem that app window size is time
> > based
> > > > > here not number of events based. However I don't think having a max
> > > batch
> > > > > size in this class will help because that causes problems with
> saving
> > > the
> > > > > tuples exactly once and idempotency.
> > > > >
> > > > > I was just trying to let you know why the batch transactional store
> > is
> > > > how
> > > > > it is.
> > > > >
> > > > > Checkout the non-transactional store output  operator
> > > > > (AbstractStoreOutputOperator) and its implementations where window
> id
> > > is
> > > > > saved with each update. I think having a batch extension of that
> can
> > > > > achieve what is needed here in a way that the operator will still
> be
> > > > > fault-tolerant and idempotent.
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> > > > > chinmay@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Chandni,
> > > > > >
> > > > > > I totally agree with you that the transactions should be
> > idempotent.
> > > > And
> > > > > > that needs to be taken care of if the batch size is configurable.
> > > > > >
> > > > > > Though, I have a question related to the second part where batch
> > size
> > > > is
> > > > > > controlled by by controlling app window size.
> > > > > > I agree with you that aggregation window is a unit of aggregation
> > > > > provided
> > > > > > by platform. But, if I understand correctly, that is time based.
> > > > > > If I want to aggregate based on number of tuples, would this be
> > > > suitable?
> > > > > >
> > > > > > To give you an example, lets say I have a store on which the
> > > > transaction
> > > > > > size should never exceed 1000 operations.
> > > > > > And as a streaming application, it would be difficult to guess
> what
> > > > would
> > > > > > be the input rate, hence its not possible to guess how many
> tuples
> > > will
> > > > > > become part of a single application window. In such case, how can
> > the
> > > > > > application window size can be used to configure transaction
> batch
> > > > size?
> > > > > > Wouldn't it make more sense to have the control via exact number
> of
> > > > > tuples?
> > > > > >
> > > > > > Thanks,
> > > > > > Chinmay.
> > > > > >
> > > > > >
> > > > > > ~ Chinmay.
> > > > > >
> > > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <
> > > > chandni@datatorrent.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Chinmay/Priyanka,
> > > > > > >
> > > > > > > We need to write tuples exactly once in the store. Please
> address
> > > the
> > > > > > > failure scenarios on how to achieve exactly once and
> > idempotency. I
> > > > > > > mentioned in my previous mail why multiple batches in a window
> > is a
> > > > > > problem
> > > > > > > with exactly once.
> > > > > > >
> > > > > > > Control via app window would mean, tuning the functionality by
> > > > > > controlling
> > > > > > > the platform params. I think it's best one gets option to
> > seperate
> > > > the
> > > > > > > concerns of platform and that of app logic.
> > > > > > >
> > > > > > > Application window is a unit of aggregation. Every operator in
> a
> > > DAG
> > > > > can
> > > > > > > have different application window which is the support platform
> > > > > provides
> > > > > > > for application logic.
> > > > > > >
> > > > > > > Chandni
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > > > > > chinmay@datatorrent.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Just a thought on how it can possibly be done.
> > > > > > > >
> > > > > > > > The pseudo code might look like this:
> > > > > > > >
> > > > > > > > processTuple()
> > > > > > > > {
> > > > > > > > If(batchSize < configuredBatchSize){
> > > > > > > >    //add to the batch
> > > > > > > > }
> > > > > > > > Else {
> > > > > > > >   // process the batch as a transaction
> > > > > > > >   // empty the data structure of batch.
> > > > > > > > }
> > > > > > > > }
> > > > > > > >
> > > > > > > > endWindow()
> > > > > > > > {
> > > > > > > > // process the batch as transaction.
> > > > > > > > // empty the data structure of batch.
> > > > > > > > }
> > > > > > > >
> > > > > > > > This way, user can get better/direct control over what
> > > transaction
> > > > > > means.
> > > > > > > >
> > > > > > > > As chandni rightly said, one can reduce the application
> window
> > > size
> > > > > for
> > > > > > > the
> > > > > > > > operator, and that would reduce the batch size. But that's
> not
> > > > > > something
> > > > > > > > which looks intuitive from user's perspective.
> > > > > > > > Control via app window would mean, tuning the functionality
> by
> > > > > > > controlling
> > > > > > > > the platform params. I think it's best one gets option to
> > > seperate
> > > > > the
> > > > > > > > concerns of platform and that of app logic.
> > > > > > > >
> > > > > > > > If one wants to control the batch size, he/she should be able
> > to
> > > do
> > > > > > that
> > > > > > > by
> > > > > > > > just setting the property of batch size(a number), and not by
> > > > > changing
> > > > > > > app
> > > > > > > > window size (an indirect time unit).
> > > > > > > >
> > > > > > > > ~ Chinmay
> > > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <
> chandni@datatorrent.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > But you will not allow multiple batches in the same window?
> > > > > > > > > Can you please elaborate on failure scenarios and how it
> > > affects
> > > > > > > > > idempotency.
> > > > > > > > >
> > > > > > > > > Chandni
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > > > > > priyanka@datatorrent.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Sorry if I was not clear, but I am trying to propose the
> > > > MAX_SIZE
> > > > > > per
> > > > > > > > > > window which the operator could process. The size could
> be
> > > less
> > > > > > than
> > > > > > > > the
> > > > > > > > > > MAX_SIZE, no restriction about that.
> > > > > > > > > >
> > > > > > > > > > -Priyanka
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > > > > > chandni@datatorrent.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > How do you propose to to restrict the no. of tuples
> > > processed
> > > > > in
> > > > > > an
> > > > > > > > > > > application window < batch size.
> > > > > > > > > > >
> > > > > > > > > > > I don't see a way to enforce that batch size can never
> be
> > > > less
> > > > > > > tuples
> > > > > > > > > > > processed in an application window.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > > > > > priyag@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Chandni,
> > > > > > > > > > > >
> > > > > > > > > > > > How about restricting tuples which can be processed
> per
> > > > > window.
> > > > > > > If
> > > > > > > > > > > someone
> > > > > > > > > > > > wants to process small and frequent batches, he can
> set
> > > > batch
> > > > > > > size
> > > > > > > > to
> > > > > > > > > > > some
> > > > > > > > > > > > small value and also reduce the window size. This
> would
> > > > build
> > > > > > > some
> > > > > > > > > back
> > > > > > > > > > > > pressure of course. But that could be acceptable if
> one
> > > > > really
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > > restrict batch size.
> > > > > > > > > > > > The though was triggered while working on Cassandra
> > > output
> > > > > > > > operator.
> > > > > > > > > > > > Cassandra creates problem in processing batches of
> size
> > > > > greater
> > > > > > > > than
> > > > > > > > > > some
> > > > > > > > > > > > value (don't recall exact number right now). Other
> > > > databases
> > > > > > may
> > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > > > > > >
> > > > > > > > > > > > -Priyanka
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > > > > > chandni@datatorrent.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Priyanka,
> > > > > > > > > > > > >
> > > > > > > > > > > > > AbstractBatchTransactionableStore assumes all
> tuples
> > in
> > > > one
> > > > > > > > > > application
> > > > > > > > > > > > as
> > > > > > > > > > > > > a batch because it needs to store the tuples in the
> > > store
> > > > > > > > > > exactly-once.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If there is more than one batch in an application
> > > window,
> > > > > > then
> > > > > > > to
> > > > > > > > > > store
> > > > > > > > > > > > the
> > > > > > > > > > > > > tuples exactly once the window Id needs to be
> written
> > > > with
> > > > > > > every
> > > > > > > > > > tuple
> > > > > > > > > > > as
> > > > > > > > > > > > > well which is not that efficient. Therefore we take
> > > > > advantage
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > transaction support by saving just the window id
> once
> > > > (not
> > > > > > with
> > > > > > > > > every
> > > > > > > > > > > > > tuple) but this necessitates all the tuples to be
> > > > > considered
> > > > > > > as a
> > > > > > > > > > > batch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Every operator in a DAG can have its own
> application
> > > > window
> > > > > > > size.
> > > > > > > > > So
> > > > > > > > > > to
> > > > > > > > > > > > > reduce the size per batch, the application window
> > > > attribute
> > > > > > > needs
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > > modified.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Chandni
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar
> <
> > > > > > > > > > > > > chinmay@datatorrent.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > +1 for this.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ~ Chinmay.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale
> <
> > > > > > > > > > priyag@apache.org>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > > > > > operator
> > > > > AbstractBatchTransactionableStoreOutputOperator
> > > > > > > > which
> > > > > > > > > > > > creates
> > > > > > > > > > > > > > > batches based on tuples received in a window.
> At
> > > the
> > > > > end
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > window
> > > > > > > > > > > > > > > these batches are sent to database for
> > processing.
> > > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these
> > > > batches.
> > > > > > > Based
> > > > > > > > > on
> > > > > > > > > > > > input
> > > > > > > > > > > > > > rate
> > > > > > > > > > > > > > > the batch sizes can grow very high, and we
> might
> > > want
> > > > > to
> > > > > > > > > restrict
> > > > > > > > > > > > batch
> > > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Any operator can extend and do batch management
> > on
> > > > > their
> > > > > > > own,
> > > > > > > > > > but I
> > > > > > > > > > > > see
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > as generic requirement and IMO we should change
> > > base
> > > > > > class
> > > > > > > > i.e.
> > > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator
> > > class
> > > > > to
> > > > > > > > accept
> > > > > > > > > > > > > MAX_SIZE
> > > > > > > > > > > > > > > for batch from outside.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Any opinion on this?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Priyanka
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
I agree with Sandeep that at-least once with databases is not the right
approach. All the output adaptors we have in the library are written so
that they do not write duplicate entries. For example FileOutputOperator.
This has nothing to do with the processing mode feature offered by the
platform. When writing to any external entity we have to ensure that we do
not introduce duplicates.

Priyanka,
I am assuming that the batch id you mean here is the number of tuples in
the app window which are persisted otherwise with every tuple there needs
to be a batch id so when a window is being replayed you will know which
tuple to discard.

We have followed that approach in some concrete implementations in POCs.
One of the reasons that we haven't added that here is that it relies on an
order of tuples within an application window. This again depends on the
upstream operator. For eg. in the default mode of dedup, the ordering of
tuples within an application window is not guaranteed.

Chandni




On Tue, Dec 29, 2015 at 4:53 AM, Priyanka Gugale <pr...@datatorrent.com>
wrote:

> One more option:
>
> We can keep track of windowId & batchId i.e. we save batchId with windowId
> when we commit a batch within a window. e.g. we are in window 10 and we
> have written second batch in window 10, we commit windowId=10 and batchId=2
> to DB. While recovery we won't process batches within last window which are
> marked as committed.
>
> -Priyanka
>
> On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <sandeep@datatorrent.com
> >
> wrote:
>
> > Not sure if "At least once" is right behavior for databases. We may not
> > always have primary key to update or insert.
> >
> >
> > Regards,
> > Sandeep
> >
> > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <pr...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for your inputs Chandni. I guess what you are suggesting is
> > similar
> > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non
> > > transactional operation. That is one of the good option.
> > >
> > > I am also thinking of a possibility of having "At least once" behavior
> > with
> > > Transactional store. In this, we keep committing batches within a
> window.
> > > Each batch commit will be  a transaction. On recovery we start
> processing
> > > from last committed window (don't exclude last committed window, as it
> > > could be partially written). If the queries are update or insert
> queries
> > > using primary key, it shouldn't  be a problem if we reply insert/update
> > > command. It will have same effect on database (of course not applicable
> > for
> > > all usecases). Does this look better?
> > >
> > > -Priyanka
> > >
> > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <
> chandni@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Yeah I understand there is a problem that app window size is time
> based
> > > > here not number of events based. However I don't think having a max
> > batch
> > > > size in this class will help because that causes problems with saving
> > the
> > > > tuples exactly once and idempotency.
> > > >
> > > > I was just trying to let you know why the batch transactional store
> is
> > > how
> > > > it is.
> > > >
> > > > Checkout the non-transactional store output  operator
> > > > (AbstractStoreOutputOperator) and its implementations where window id
> > is
> > > > saved with each update. I think having a batch extension of that can
> > > > achieve what is needed here in a way that the operator will still be
> > > > fault-tolerant and idempotent.
> > > >
> > > > Thanks,
> > > > Chandni
> > > >
> > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> > > > chinmay@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi Chandni,
> > > > >
> > > > > I totally agree with you that the transactions should be
> idempotent.
> > > And
> > > > > that needs to be taken care of if the batch size is configurable.
> > > > >
> > > > > Though, I have a question related to the second part where batch
> size
> > > is
> > > > > controlled by by controlling app window size.
> > > > > I agree with you that aggregation window is a unit of aggregation
> > > > provided
> > > > > by platform. But, if I understand correctly, that is time based.
> > > > > If I want to aggregate based on number of tuples, would this be
> > > suitable?
> > > > >
> > > > > To give you an example, lets say I have a store on which the
> > > transaction
> > > > > size should never exceed 1000 operations.
> > > > > And as a streaming application, it would be difficult to guess what
> > > would
> > > > > be the input rate, hence its not possible to guess how many tuples
> > will
> > > > > become part of a single application window. In such case, how can
> the
> > > > > application window size can be used to configure transaction batch
> > > size?
> > > > > Wouldn't it make more sense to have the control via exact number of
> > > > tuples?
> > > > >
> > > > > Thanks,
> > > > > Chinmay.
> > > > >
> > > > >
> > > > > ~ Chinmay.
> > > > >
> > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <
> > > chandni@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey Chinmay/Priyanka,
> > > > > >
> > > > > > We need to write tuples exactly once in the store. Please address
> > the
> > > > > > failure scenarios on how to achieve exactly once and
> idempotency. I
> > > > > > mentioned in my previous mail why multiple batches in a window
> is a
> > > > > problem
> > > > > > with exactly once.
> > > > > >
> > > > > > Control via app window would mean, tuning the functionality by
> > > > > controlling
> > > > > > the platform params. I think it's best one gets option to
> seperate
> > > the
> > > > > > concerns of platform and that of app logic.
> > > > > >
> > > > > > Application window is a unit of aggregation. Every operator in a
> > DAG
> > > > can
> > > > > > have different application window which is the support platform
> > > > provides
> > > > > > for application logic.
> > > > > >
> > > > > > Chandni
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > > > > chinmay@datatorrent.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Just a thought on how it can possibly be done.
> > > > > > >
> > > > > > > The pseudo code might look like this:
> > > > > > >
> > > > > > > processTuple()
> > > > > > > {
> > > > > > > If(batchSize < configuredBatchSize){
> > > > > > >    //add to the batch
> > > > > > > }
> > > > > > > Else {
> > > > > > >   // process the batch as a transaction
> > > > > > >   // empty the data structure of batch.
> > > > > > > }
> > > > > > > }
> > > > > > >
> > > > > > > endWindow()
> > > > > > > {
> > > > > > > // process the batch as transaction.
> > > > > > > // empty the data structure of batch.
> > > > > > > }
> > > > > > >
> > > > > > > This way, user can get better/direct control over what
> > transaction
> > > > > means.
> > > > > > >
> > > > > > > As chandni rightly said, one can reduce the application window
> > size
> > > > for
> > > > > > the
> > > > > > > operator, and that would reduce the batch size. But that's not
> > > > > something
> > > > > > > which looks intuitive from user's perspective.
> > > > > > > Control via app window would mean, tuning the functionality by
> > > > > > controlling
> > > > > > > the platform params. I think it's best one gets option to
> > seperate
> > > > the
> > > > > > > concerns of platform and that of app logic.
> > > > > > >
> > > > > > > If one wants to control the batch size, he/she should be able
> to
> > do
> > > > > that
> > > > > > by
> > > > > > > just setting the property of batch size(a number), and not by
> > > > changing
> > > > > > app
> > > > > > > window size (an indirect time unit).
> > > > > > >
> > > > > > > ~ Chinmay
> > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <chandni@datatorrent.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > But you will not allow multiple batches in the same window?
> > > > > > > > Can you please elaborate on failure scenarios and how it
> > affects
> > > > > > > > idempotency.
> > > > > > > >
> > > > > > > > Chandni
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > > > > priyanka@datatorrent.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > Sorry if I was not clear, but I am trying to propose the
> > > MAX_SIZE
> > > > > per
> > > > > > > > > window which the operator could process. The size could be
> > less
> > > > > than
> > > > > > > the
> > > > > > > > > MAX_SIZE, no restriction about that.
> > > > > > > > >
> > > > > > > > > -Priyanka
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > > > > chandni@datatorrent.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > How do you propose to to restrict the no. of tuples
> > processed
> > > > in
> > > > > an
> > > > > > > > > > application window < batch size.
> > > > > > > > > >
> > > > > > > > > > I don't see a way to enforce that batch size can never be
> > > less
> > > > > > tuples
> > > > > > > > > > processed in an application window.
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > > > > priyag@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chandni,
> > > > > > > > > > >
> > > > > > > > > > > How about restricting tuples which can be processed per
> > > > window.
> > > > > > If
> > > > > > > > > > someone
> > > > > > > > > > > wants to process small and frequent batches, he can set
> > > batch
> > > > > > size
> > > > > > > to
> > > > > > > > > > some
> > > > > > > > > > > small value and also reduce the window size. This would
> > > build
> > > > > > some
> > > > > > > > back
> > > > > > > > > > > pressure of course. But that could be acceptable if one
> > > > really
> > > > > > want
> > > > > > > > to
> > > > > > > > > > > restrict batch size.
> > > > > > > > > > > The though was triggered while working on Cassandra
> > output
> > > > > > > operator.
> > > > > > > > > > > Cassandra creates problem in processing batches of size
> > > > greater
> > > > > > > than
> > > > > > > > > some
> > > > > > > > > > > value (don't recall exact number right now). Other
> > > databases
> > > > > may
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > > > > >
> > > > > > > > > > > -Priyanka
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > > > > chandni@datatorrent.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Priyanka,
> > > > > > > > > > > >
> > > > > > > > > > > > AbstractBatchTransactionableStore assumes all tuples
> in
> > > one
> > > > > > > > > application
> > > > > > > > > > > as
> > > > > > > > > > > > a batch because it needs to store the tuples in the
> > store
> > > > > > > > > exactly-once.
> > > > > > > > > > > >
> > > > > > > > > > > > If there is more than one batch in an application
> > window,
> > > > > then
> > > > > > to
> > > > > > > > > store
> > > > > > > > > > > the
> > > > > > > > > > > > tuples exactly once the window Id needs to be written
> > > with
> > > > > > every
> > > > > > > > > tuple
> > > > > > > > > > as
> > > > > > > > > > > > well which is not that efficient. Therefore we take
> > > > advantage
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > transaction support by saving just the window id once
> > > (not
> > > > > with
> > > > > > > > every
> > > > > > > > > > > > tuple) but this necessitates all the tuples to be
> > > > considered
> > > > > > as a
> > > > > > > > > > batch.
> > > > > > > > > > > >
> > > > > > > > > > > > Every operator in a DAG can have its own application
> > > window
> > > > > > size.
> > > > > > > > So
> > > > > > > > > to
> > > > > > > > > > > > reduce the size per batch, the application window
> > > attribute
> > > > > > needs
> > > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > > modified.
> > > > > > > > > > > >
> > > > > > > > > > > > Chandni
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > > > > > > chinmay@datatorrent.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > +1 for this.
> > > > > > > > > > > > >
> > > > > > > > > > > > > ~ Chinmay.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > > > > > > priyag@apache.org>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > > > > operator
> > > > AbstractBatchTransactionableStoreOutputOperator
> > > > > > > which
> > > > > > > > > > > creates
> > > > > > > > > > > > > > batches based on tuples received in a window. At
> > the
> > > > end
> > > > > of
> > > > > > > the
> > > > > > > > > > > window
> > > > > > > > > > > > > > these batches are sent to database for
> processing.
> > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these
> > > batches.
> > > > > > Based
> > > > > > > > on
> > > > > > > > > > > input
> > > > > > > > > > > > > rate
> > > > > > > > > > > > > > the batch sizes can grow very high, and we might
> > want
> > > > to
> > > > > > > > restrict
> > > > > > > > > > > batch
> > > > > > > > > > > > > > size.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Any operator can extend and do batch management
> on
> > > > their
> > > > > > own,
> > > > > > > > > but I
> > > > > > > > > > > see
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > as generic requirement and IMO we should change
> > base
> > > > > class
> > > > > > > i.e.
> > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator
> > class
> > > > to
> > > > > > > accept
> > > > > > > > > > > > MAX_SIZE
> > > > > > > > > > > > > > for batch from outside.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Any opinion on this?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Priyanka
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
One more option:

We can keep track of windowId & batchId i.e. we save batchId with windowId
when we commit a batch within a window. e.g. we are in window 10 and we
have written second batch in window 10, we commit windowId=10 and batchId=2
to DB. While recovery we won't process batches within last window which are
marked as committed.

-Priyanka

On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <sa...@datatorrent.com>
wrote:

> Not sure if "At least once" is right behavior for databases. We may not
> always have primary key to update or insert.
>
>
> Regards,
> Sandeep
>
> On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <pr...@apache.org>
> wrote:
>
> > Hi,
> >
> > Thanks for your inputs Chandni. I guess what you are suggesting is
> similar
> > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non
> > transactional operation. That is one of the good option.
> >
> > I am also thinking of a possibility of having "At least once" behavior
> with
> > Transactional store. In this, we keep committing batches within a window.
> > Each batch commit will be  a transaction. On recovery we start processing
> > from last committed window (don't exclude last committed window, as it
> > could be partially written). If the queries are update or insert queries
> > using primary key, it shouldn't  be a problem if we reply insert/update
> > command. It will have same effect on database (of course not applicable
> for
> > all usecases). Does this look better?
> >
> > -Priyanka
> >
> > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> > > Yeah I understand there is a problem that app window size is time based
> > > here not number of events based. However I don't think having a max
> batch
> > > size in this class will help because that causes problems with saving
> the
> > > tuples exactly once and idempotency.
> > >
> > > I was just trying to let you know why the batch transactional store is
> > how
> > > it is.
> > >
> > > Checkout the non-transactional store output  operator
> > > (AbstractStoreOutputOperator) and its implementations where window id
> is
> > > saved with each update. I think having a batch extension of that can
> > > achieve what is needed here in a way that the operator will still be
> > > fault-tolerant and idempotent.
> > >
> > > Thanks,
> > > Chandni
> > >
> > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> > > chinmay@datatorrent.com>
> > > wrote:
> > >
> > > > Hi Chandni,
> > > >
> > > > I totally agree with you that the transactions should be idempotent.
> > And
> > > > that needs to be taken care of if the batch size is configurable.
> > > >
> > > > Though, I have a question related to the second part where batch size
> > is
> > > > controlled by by controlling app window size.
> > > > I agree with you that aggregation window is a unit of aggregation
> > > provided
> > > > by platform. But, if I understand correctly, that is time based.
> > > > If I want to aggregate based on number of tuples, would this be
> > suitable?
> > > >
> > > > To give you an example, lets say I have a store on which the
> > transaction
> > > > size should never exceed 1000 operations.
> > > > And as a streaming application, it would be difficult to guess what
> > would
> > > > be the input rate, hence its not possible to guess how many tuples
> will
> > > > become part of a single application window. In such case, how can the
> > > > application window size can be used to configure transaction batch
> > size?
> > > > Wouldn't it make more sense to have the control via exact number of
> > > tuples?
> > > >
> > > > Thanks,
> > > > Chinmay.
> > > >
> > > >
> > > > ~ Chinmay.
> > > >
> > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <
> > chandni@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Hey Chinmay/Priyanka,
> > > > >
> > > > > We need to write tuples exactly once in the store. Please address
> the
> > > > > failure scenarios on how to achieve exactly once and idempotency. I
> > > > > mentioned in my previous mail why multiple batches in a window is a
> > > > problem
> > > > > with exactly once.
> > > > >
> > > > > Control via app window would mean, tuning the functionality by
> > > > controlling
> > > > > the platform params. I think it's best one gets option to seperate
> > the
> > > > > concerns of platform and that of app logic.
> > > > >
> > > > > Application window is a unit of aggregation. Every operator in a
> DAG
> > > can
> > > > > have different application window which is the support platform
> > > provides
> > > > > for application logic.
> > > > >
> > > > > Chandni
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > > > chinmay@datatorrent.com
> > > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Just a thought on how it can possibly be done.
> > > > > >
> > > > > > The pseudo code might look like this:
> > > > > >
> > > > > > processTuple()
> > > > > > {
> > > > > > If(batchSize < configuredBatchSize){
> > > > > >    //add to the batch
> > > > > > }
> > > > > > Else {
> > > > > >   // process the batch as a transaction
> > > > > >   // empty the data structure of batch.
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > endWindow()
> > > > > > {
> > > > > > // process the batch as transaction.
> > > > > > // empty the data structure of batch.
> > > > > > }
> > > > > >
> > > > > > This way, user can get better/direct control over what
> transaction
> > > > means.
> > > > > >
> > > > > > As chandni rightly said, one can reduce the application window
> size
> > > for
> > > > > the
> > > > > > operator, and that would reduce the batch size. But that's not
> > > > something
> > > > > > which looks intuitive from user's perspective.
> > > > > > Control via app window would mean, tuning the functionality by
> > > > > controlling
> > > > > > the platform params. I think it's best one gets option to
> seperate
> > > the
> > > > > > concerns of platform and that of app logic.
> > > > > >
> > > > > > If one wants to control the batch size, he/she should be able to
> do
> > > > that
> > > > > by
> > > > > > just setting the property of batch size(a number), and not by
> > > changing
> > > > > app
> > > > > > window size (an indirect time unit).
> > > > > >
> > > > > > ~ Chinmay
> > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com>
> > > wrote:
> > > > > >
> > > > > > > But you will not allow multiple batches in the same window?
> > > > > > > Can you please elaborate on failure scenarios and how it
> affects
> > > > > > > idempotency.
> > > > > > >
> > > > > > > Chandni
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > > > priyanka@datatorrent.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Sorry if I was not clear, but I am trying to propose the
> > MAX_SIZE
> > > > per
> > > > > > > > window which the operator could process. The size could be
> less
> > > > than
> > > > > > the
> > > > > > > > MAX_SIZE, no restriction about that.
> > > > > > > >
> > > > > > > > -Priyanka
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > > > chandni@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > How do you propose to to restrict the no. of tuples
> processed
> > > in
> > > > an
> > > > > > > > > application window < batch size.
> > > > > > > > >
> > > > > > > > > I don't see a way to enforce that batch size can never be
> > less
> > > > > tuples
> > > > > > > > > processed in an application window.
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > > > priyag@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Chandni,
> > > > > > > > > >
> > > > > > > > > > How about restricting tuples which can be processed per
> > > window.
> > > > > If
> > > > > > > > > someone
> > > > > > > > > > wants to process small and frequent batches, he can set
> > batch
> > > > > size
> > > > > > to
> > > > > > > > > some
> > > > > > > > > > small value and also reduce the window size. This would
> > build
> > > > > some
> > > > > > > back
> > > > > > > > > > pressure of course. But that could be acceptable if one
> > > really
> > > > > want
> > > > > > > to
> > > > > > > > > > restrict batch size.
> > > > > > > > > > The though was triggered while working on Cassandra
> output
> > > > > > operator.
> > > > > > > > > > Cassandra creates problem in processing batches of size
> > > greater
> > > > > > than
> > > > > > > > some
> > > > > > > > > > value (don't recall exact number right now). Other
> > databases
> > > > may
> > > > > > want
> > > > > > > > to
> > > > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > > > >
> > > > > > > > > > -Priyanka
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > > > chandni@datatorrent.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Priyanka,
> > > > > > > > > > >
> > > > > > > > > > > AbstractBatchTransactionableStore assumes all tuples in
> > one
> > > > > > > > application
> > > > > > > > > > as
> > > > > > > > > > > a batch because it needs to store the tuples in the
> store
> > > > > > > > exactly-once.
> > > > > > > > > > >
> > > > > > > > > > > If there is more than one batch in an application
> window,
> > > > then
> > > > > to
> > > > > > > > store
> > > > > > > > > > the
> > > > > > > > > > > tuples exactly once the window Id needs to be written
> > with
> > > > > every
> > > > > > > > tuple
> > > > > > > > > as
> > > > > > > > > > > well which is not that efficient. Therefore we take
> > > advantage
> > > > > of
> > > > > > > the
> > > > > > > > > > > transaction support by saving just the window id once
> > (not
> > > > with
> > > > > > > every
> > > > > > > > > > > tuple) but this necessitates all the tuples to be
> > > considered
> > > > > as a
> > > > > > > > > batch.
> > > > > > > > > > >
> > > > > > > > > > > Every operator in a DAG can have its own application
> > window
> > > > > size.
> > > > > > > So
> > > > > > > > to
> > > > > > > > > > > reduce the size per batch, the application window
> > attribute
> > > > > needs
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > modified.
> > > > > > > > > > >
> > > > > > > > > > > Chandni
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > > > > > chinmay@datatorrent.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > +1 for this.
> > > > > > > > > > > >
> > > > > > > > > > > > ~ Chinmay.
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > > > > > priyag@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi,
> > > > > > > > > > > > >
> > > > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > > > operator
> > > AbstractBatchTransactionableStoreOutputOperator
> > > > > > which
> > > > > > > > > > creates
> > > > > > > > > > > > > batches based on tuples received in a window. At
> the
> > > end
> > > > of
> > > > > > the
> > > > > > > > > > window
> > > > > > > > > > > > > these batches are sent to database for processing.
> > > > > > > > > > > > > There is no way to configure MAX_SIZE on these
> > batches.
> > > > > Based
> > > > > > > on
> > > > > > > > > > input
> > > > > > > > > > > > rate
> > > > > > > > > > > > > the batch sizes can grow very high, and we might
> want
> > > to
> > > > > > > restrict
> > > > > > > > > > batch
> > > > > > > > > > > > > size.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Any operator can extend and do batch management on
> > > their
> > > > > own,
> > > > > > > > but I
> > > > > > > > > > see
> > > > > > > > > > > > it
> > > > > > > > > > > > > as generic requirement and IMO we should change
> base
> > > > class
> > > > > > i.e.
> > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator
> class
> > > to
> > > > > > accept
> > > > > > > > > > > MAX_SIZE
> > > > > > > > > > > > > for batch from outside.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Any opinion on this?
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Priyanka
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Sandeep Deshmukh <sa...@datatorrent.com>.
Not sure if "At least once" is right behavior for databases. We may not
always have primary key to update or insert.


Regards,
Sandeep

On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <pr...@apache.org> wrote:

> Hi,
>
> Thanks for your inputs Chandni. I guess what you are suggesting is similar
> to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non
> transactional operation. That is one of the good option.
>
> I am also thinking of a possibility of having "At least once" behavior with
> Transactional store. In this, we keep committing batches within a window.
> Each batch commit will be  a transaction. On recovery we start processing
> from last committed window (don't exclude last committed window, as it
> could be partially written). If the queries are update or insert queries
> using primary key, it shouldn't  be a problem if we reply insert/update
> command. It will have same effect on database (of course not applicable for
> all usecases). Does this look better?
>
> -Priyanka
>
> On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Yeah I understand there is a problem that app window size is time based
> > here not number of events based. However I don't think having a max batch
> > size in this class will help because that causes problems with saving the
> > tuples exactly once and idempotency.
> >
> > I was just trying to let you know why the batch transactional store is
> how
> > it is.
> >
> > Checkout the non-transactional store output  operator
> > (AbstractStoreOutputOperator) and its implementations where window id is
> > saved with each update. I think having a batch extension of that can
> > achieve what is needed here in a way that the operator will still be
> > fault-tolerant and idempotent.
> >
> > Thanks,
> > Chandni
> >
> > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> > chinmay@datatorrent.com>
> > wrote:
> >
> > > Hi Chandni,
> > >
> > > I totally agree with you that the transactions should be idempotent.
> And
> > > that needs to be taken care of if the batch size is configurable.
> > >
> > > Though, I have a question related to the second part where batch size
> is
> > > controlled by by controlling app window size.
> > > I agree with you that aggregation window is a unit of aggregation
> > provided
> > > by platform. But, if I understand correctly, that is time based.
> > > If I want to aggregate based on number of tuples, would this be
> suitable?
> > >
> > > To give you an example, lets say I have a store on which the
> transaction
> > > size should never exceed 1000 operations.
> > > And as a streaming application, it would be difficult to guess what
> would
> > > be the input rate, hence its not possible to guess how many tuples will
> > > become part of a single application window. In such case, how can the
> > > application window size can be used to configure transaction batch
> size?
> > > Wouldn't it make more sense to have the control via exact number of
> > tuples?
> > >
> > > Thanks,
> > > Chinmay.
> > >
> > >
> > > ~ Chinmay.
> > >
> > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <
> chandni@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Hey Chinmay/Priyanka,
> > > >
> > > > We need to write tuples exactly once in the store. Please address the
> > > > failure scenarios on how to achieve exactly once and idempotency. I
> > > > mentioned in my previous mail why multiple batches in a window is a
> > > problem
> > > > with exactly once.
> > > >
> > > > Control via app window would mean, tuning the functionality by
> > > controlling
> > > > the platform params. I think it's best one gets option to seperate
> the
> > > > concerns of platform and that of app logic.
> > > >
> > > > Application window is a unit of aggregation. Every operator in a DAG
> > can
> > > > have different application window which is the support platform
> > provides
> > > > for application logic.
> > > >
> > > > Chandni
> > > >
> > > >
> > > >
> > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > > chinmay@datatorrent.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Just a thought on how it can possibly be done.
> > > > >
> > > > > The pseudo code might look like this:
> > > > >
> > > > > processTuple()
> > > > > {
> > > > > If(batchSize < configuredBatchSize){
> > > > >    //add to the batch
> > > > > }
> > > > > Else {
> > > > >   // process the batch as a transaction
> > > > >   // empty the data structure of batch.
> > > > > }
> > > > > }
> > > > >
> > > > > endWindow()
> > > > > {
> > > > > // process the batch as transaction.
> > > > > // empty the data structure of batch.
> > > > > }
> > > > >
> > > > > This way, user can get better/direct control over what transaction
> > > means.
> > > > >
> > > > > As chandni rightly said, one can reduce the application window size
> > for
> > > > the
> > > > > operator, and that would reduce the batch size. But that's not
> > > something
> > > > > which looks intuitive from user's perspective.
> > > > > Control via app window would mean, tuning the functionality by
> > > > controlling
> > > > > the platform params. I think it's best one gets option to seperate
> > the
> > > > > concerns of platform and that of app logic.
> > > > >
> > > > > If one wants to control the batch size, he/she should be able to do
> > > that
> > > > by
> > > > > just setting the property of batch size(a number), and not by
> > changing
> > > > app
> > > > > window size (an indirect time unit).
> > > > >
> > > > > ~ Chinmay
> > > > > On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com>
> > wrote:
> > > > >
> > > > > > But you will not allow multiple batches in the same window?
> > > > > > Can you please elaborate on failure scenarios and how it affects
> > > > > > idempotency.
> > > > > >
> > > > > > Chandni
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > > priyanka@datatorrent.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Sorry if I was not clear, but I am trying to propose the
> MAX_SIZE
> > > per
> > > > > > > window which the operator could process. The size could be less
> > > than
> > > > > the
> > > > > > > MAX_SIZE, no restriction about that.
> > > > > > >
> > > > > > > -Priyanka
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > > chandni@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > How do you propose to to restrict the no. of tuples processed
> > in
> > > an
> > > > > > > > application window < batch size.
> > > > > > > >
> > > > > > > > I don't see a way to enforce that batch size can never be
> less
> > > > tuples
> > > > > > > > processed in an application window.
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > > priyag@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chandni,
> > > > > > > > >
> > > > > > > > > How about restricting tuples which can be processed per
> > window.
> > > > If
> > > > > > > > someone
> > > > > > > > > wants to process small and frequent batches, he can set
> batch
> > > > size
> > > > > to
> > > > > > > > some
> > > > > > > > > small value and also reduce the window size. This would
> build
> > > > some
> > > > > > back
> > > > > > > > > pressure of course. But that could be acceptable if one
> > really
> > > > want
> > > > > > to
> > > > > > > > > restrict batch size.
> > > > > > > > > The though was triggered while working on Cassandra output
> > > > > operator.
> > > > > > > > > Cassandra creates problem in processing batches of size
> > greater
> > > > > than
> > > > > > > some
> > > > > > > > > value (don't recall exact number right now). Other
> databases
> > > may
> > > > > want
> > > > > > > to
> > > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > > >
> > > > > > > > > -Priyanka
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > > chandni@datatorrent.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Priyanka,
> > > > > > > > > >
> > > > > > > > > > AbstractBatchTransactionableStore assumes all tuples in
> one
> > > > > > > application
> > > > > > > > > as
> > > > > > > > > > a batch because it needs to store the tuples in the store
> > > > > > > exactly-once.
> > > > > > > > > >
> > > > > > > > > > If there is more than one batch in an application window,
> > > then
> > > > to
> > > > > > > store
> > > > > > > > > the
> > > > > > > > > > tuples exactly once the window Id needs to be written
> with
> > > > every
> > > > > > > tuple
> > > > > > > > as
> > > > > > > > > > well which is not that efficient. Therefore we take
> > advantage
> > > > of
> > > > > > the
> > > > > > > > > > transaction support by saving just the window id once
> (not
> > > with
> > > > > > every
> > > > > > > > > > tuple) but this necessitates all the tuples to be
> > considered
> > > > as a
> > > > > > > > batch.
> > > > > > > > > >
> > > > > > > > > > Every operator in a DAG can have its own application
> window
> > > > size.
> > > > > > So
> > > > > > > to
> > > > > > > > > > reduce the size per batch, the application window
> attribute
> > > > needs
> > > > > > to
> > > > > > > be
> > > > > > > > > > modified.
> > > > > > > > > >
> > > > > > > > > > Chandni
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > > > > chinmay@datatorrent.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > +1 for this.
> > > > > > > > > > >
> > > > > > > > > > > ~ Chinmay.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > > > > priyag@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > > operator
> > AbstractBatchTransactionableStoreOutputOperator
> > > > > which
> > > > > > > > > creates
> > > > > > > > > > > > batches based on tuples received in a window. At the
> > end
> > > of
> > > > > the
> > > > > > > > > window
> > > > > > > > > > > > these batches are sent to database for processing.
> > > > > > > > > > > > There is no way to configure MAX_SIZE on these
> batches.
> > > > Based
> > > > > > on
> > > > > > > > > input
> > > > > > > > > > > rate
> > > > > > > > > > > > the batch sizes can grow very high, and we might want
> > to
> > > > > > restrict
> > > > > > > > > batch
> > > > > > > > > > > > size.
> > > > > > > > > > > >
> > > > > > > > > > > > Any operator can extend and do batch management on
> > their
> > > > own,
> > > > > > > but I
> > > > > > > > > see
> > > > > > > > > > > it
> > > > > > > > > > > > as generic requirement and IMO we should change base
> > > class
> > > > > i.e.
> > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator class
> > to
> > > > > accept
> > > > > > > > > > MAX_SIZE
> > > > > > > > > > > > for batch from outside.
> > > > > > > > > > > >
> > > > > > > > > > > > Any opinion on this?
> > > > > > > > > > > >
> > > > > > > > > > > > -Priyanka
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Priyanka Gugale <pr...@apache.org>.
Hi,

Thanks for your inputs Chandni. I guess what you are suggesting is similar
to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non
transactional operation. That is one of the good option.

I am also thinking of a possibility of having "At least once" behavior with
Transactional store. In this, we keep committing batches within a window.
Each batch commit will be  a transaction. On recovery we start processing
from last committed window (don't exclude last committed window, as it
could be partially written). If the queries are update or insert queries
using primary key, it shouldn't  be a problem if we reply insert/update
command. It will have same effect on database (of course not applicable for
all usecases). Does this look better?

-Priyanka

On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Yeah I understand there is a problem that app window size is time based
> here not number of events based. However I don't think having a max batch
> size in this class will help because that causes problems with saving the
> tuples exactly once and idempotency.
>
> I was just trying to let you know why the batch transactional store is how
> it is.
>
> Checkout the non-transactional store output  operator
> (AbstractStoreOutputOperator) and its implementations where window id is
> saved with each update. I think having a batch extension of that can
> achieve what is needed here in a way that the operator will still be
> fault-tolerant and idempotent.
>
> Thanks,
> Chandni
>
> On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> chinmay@datatorrent.com>
> wrote:
>
> > Hi Chandni,
> >
> > I totally agree with you that the transactions should be idempotent. And
> > that needs to be taken care of if the batch size is configurable.
> >
> > Though, I have a question related to the second part where batch size is
> > controlled by by controlling app window size.
> > I agree with you that aggregation window is a unit of aggregation
> provided
> > by platform. But, if I understand correctly, that is time based.
> > If I want to aggregate based on number of tuples, would this be suitable?
> >
> > To give you an example, lets say I have a store on which the transaction
> > size should never exceed 1000 operations.
> > And as a streaming application, it would be difficult to guess what would
> > be the input rate, hence its not possible to guess how many tuples will
> > become part of a single application window. In such case, how can the
> > application window size can be used to configure transaction batch size?
> > Wouldn't it make more sense to have the control via exact number of
> tuples?
> >
> > Thanks,
> > Chinmay.
> >
> >
> > ~ Chinmay.
> >
> > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <chandni@datatorrent.com
> >
> > wrote:
> >
> > > Hey Chinmay/Priyanka,
> > >
> > > We need to write tuples exactly once in the store. Please address the
> > > failure scenarios on how to achieve exactly once and idempotency. I
> > > mentioned in my previous mail why multiple batches in a window is a
> > problem
> > > with exactly once.
> > >
> > > Control via app window would mean, tuning the functionality by
> > controlling
> > > the platform params. I think it's best one gets option to seperate the
> > > concerns of platform and that of app logic.
> > >
> > > Application window is a unit of aggregation. Every operator in a DAG
> can
> > > have different application window which is the support platform
> provides
> > > for application logic.
> > >
> > > Chandni
> > >
> > >
> > >
> > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > chinmay@datatorrent.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Just a thought on how it can possibly be done.
> > > >
> > > > The pseudo code might look like this:
> > > >
> > > > processTuple()
> > > > {
> > > > If(batchSize < configuredBatchSize){
> > > >    //add to the batch
> > > > }
> > > > Else {
> > > >   // process the batch as a transaction
> > > >   // empty the data structure of batch.
> > > > }
> > > > }
> > > >
> > > > endWindow()
> > > > {
> > > > // process the batch as transaction.
> > > > // empty the data structure of batch.
> > > > }
> > > >
> > > > This way, user can get better/direct control over what transaction
> > means.
> > > >
> > > > As chandni rightly said, one can reduce the application window size
> for
> > > the
> > > > operator, and that would reduce the batch size. But that's not
> > something
> > > > which looks intuitive from user's perspective.
> > > > Control via app window would mean, tuning the functionality by
> > > controlling
> > > > the platform params. I think it's best one gets option to seperate
> the
> > > > concerns of platform and that of app logic.
> > > >
> > > > If one wants to control the batch size, he/she should be able to do
> > that
> > > by
> > > > just setting the property of batch size(a number), and not by
> changing
> > > app
> > > > window size (an indirect time unit).
> > > >
> > > > ~ Chinmay
> > > > On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com>
> wrote:
> > > >
> > > > > But you will not allow multiple batches in the same window?
> > > > > Can you please elaborate on failure scenarios and how it affects
> > > > > idempotency.
> > > > >
> > > > > Chandni
> > > > >
> > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > priyanka@datatorrent.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Sorry if I was not clear, but I am trying to propose the MAX_SIZE
> > per
> > > > > > window which the operator could process. The size could be less
> > than
> > > > the
> > > > > > MAX_SIZE, no restriction about that.
> > > > > >
> > > > > > -Priyanka
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > chandni@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > How do you propose to to restrict the no. of tuples processed
> in
> > an
> > > > > > > application window < batch size.
> > > > > > >
> > > > > > > I don't see a way to enforce that batch size can never be less
> > > tuples
> > > > > > > processed in an application window.
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > priyag@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Chandni,
> > > > > > > >
> > > > > > > > How about restricting tuples which can be processed per
> window.
> > > If
> > > > > > > someone
> > > > > > > > wants to process small and frequent batches, he can set batch
> > > size
> > > > to
> > > > > > > some
> > > > > > > > small value and also reduce the window size. This would build
> > > some
> > > > > back
> > > > > > > > pressure of course. But that could be acceptable if one
> really
> > > want
> > > > > to
> > > > > > > > restrict batch size.
> > > > > > > > The though was triggered while working on Cassandra output
> > > > operator.
> > > > > > > > Cassandra creates problem in processing batches of size
> greater
> > > > than
> > > > > > some
> > > > > > > > value (don't recall exact number right now). Other databases
> > may
> > > > want
> > > > > > to
> > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > >
> > > > > > > > -Priyanka
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > chandni@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Priyanka,
> > > > > > > > >
> > > > > > > > > AbstractBatchTransactionableStore assumes all tuples in one
> > > > > > application
> > > > > > > > as
> > > > > > > > > a batch because it needs to store the tuples in the store
> > > > > > exactly-once.
> > > > > > > > >
> > > > > > > > > If there is more than one batch in an application window,
> > then
> > > to
> > > > > > store
> > > > > > > > the
> > > > > > > > > tuples exactly once the window Id needs to be written with
> > > every
> > > > > > tuple
> > > > > > > as
> > > > > > > > > well which is not that efficient. Therefore we take
> advantage
> > > of
> > > > > the
> > > > > > > > > transaction support by saving just the window id once (not
> > with
> > > > > every
> > > > > > > > > tuple) but this necessitates all the tuples to be
> considered
> > > as a
> > > > > > > batch.
> > > > > > > > >
> > > > > > > > > Every operator in a DAG can have its own application window
> > > size.
> > > > > So
> > > > > > to
> > > > > > > > > reduce the size per batch, the application window attribute
> > > needs
> > > > > to
> > > > > > be
> > > > > > > > > modified.
> > > > > > > > >
> > > > > > > > > Chandni
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > > > chinmay@datatorrent.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1 for this.
> > > > > > > > > >
> > > > > > > > > > ~ Chinmay.
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > > > priyag@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > operator
> AbstractBatchTransactionableStoreOutputOperator
> > > > which
> > > > > > > > creates
> > > > > > > > > > > batches based on tuples received in a window. At the
> end
> > of
> > > > the
> > > > > > > > window
> > > > > > > > > > > these batches are sent to database for processing.
> > > > > > > > > > > There is no way to configure MAX_SIZE on these batches.
> > > Based
> > > > > on
> > > > > > > > input
> > > > > > > > > > rate
> > > > > > > > > > > the batch sizes can grow very high, and we might want
> to
> > > > > restrict
> > > > > > > > batch
> > > > > > > > > > > size.
> > > > > > > > > > >
> > > > > > > > > > > Any operator can extend and do batch management on
> their
> > > own,
> > > > > > but I
> > > > > > > > see
> > > > > > > > > > it
> > > > > > > > > > > as generic requirement and IMO we should change base
> > class
> > > > i.e.
> > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator class
> to
> > > > accept
> > > > > > > > > MAX_SIZE
> > > > > > > > > > > for batch from outside.
> > > > > > > > > > >
> > > > > > > > > > > Any opinion on this?
> > > > > > > > > > >
> > > > > > > > > > > -Priyanka
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
Yeah I understand there is a problem that app window size is time based
here not number of events based. However I don't think having a max batch
size in this class will help because that causes problems with saving the
tuples exactly once and idempotency.

I was just trying to let you know why the batch transactional store is how
it is.

Checkout the non-transactional store output  operator
(AbstractStoreOutputOperator) and its implementations where window id is
saved with each update. I think having a batch extension of that can
achieve what is needed here in a way that the operator will still be
fault-tolerant and idempotent.

Thanks,
Chandni

On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <ch...@datatorrent.com>
wrote:

> Hi Chandni,
>
> I totally agree with you that the transactions should be idempotent. And
> that needs to be taken care of if the batch size is configurable.
>
> Though, I have a question related to the second part where batch size is
> controlled by by controlling app window size.
> I agree with you that aggregation window is a unit of aggregation provided
> by platform. But, if I understand correctly, that is time based.
> If I want to aggregate based on number of tuples, would this be suitable?
>
> To give you an example, lets say I have a store on which the transaction
> size should never exceed 1000 operations.
> And as a streaming application, it would be difficult to guess what would
> be the input rate, hence its not possible to guess how many tuples will
> become part of a single application window. In such case, how can the
> application window size can be used to configure transaction batch size?
> Wouldn't it make more sense to have the control via exact number of tuples?
>
> Thanks,
> Chinmay.
>
>
> ~ Chinmay.
>
> On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Hey Chinmay/Priyanka,
> >
> > We need to write tuples exactly once in the store. Please address the
> > failure scenarios on how to achieve exactly once and idempotency. I
> > mentioned in my previous mail why multiple batches in a window is a
> problem
> > with exactly once.
> >
> > Control via app window would mean, tuning the functionality by
> controlling
> > the platform params. I think it's best one gets option to seperate the
> > concerns of platform and that of app logic.
> >
> > Application window is a unit of aggregation. Every operator in a DAG can
> > have different application window which is the support platform provides
> > for application logic.
> >
> > Chandni
> >
> >
> >
> > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > chinmay@datatorrent.com
> > > wrote:
> >
> > > Hi,
> > >
> > > Just a thought on how it can possibly be done.
> > >
> > > The pseudo code might look like this:
> > >
> > > processTuple()
> > > {
> > > If(batchSize < configuredBatchSize){
> > >    //add to the batch
> > > }
> > > Else {
> > >   // process the batch as a transaction
> > >   // empty the data structure of batch.
> > > }
> > > }
> > >
> > > endWindow()
> > > {
> > > // process the batch as transaction.
> > > // empty the data structure of batch.
> > > }
> > >
> > > This way, user can get better/direct control over what transaction
> means.
> > >
> > > As chandni rightly said, one can reduce the application window size for
> > the
> > > operator, and that would reduce the batch size. But that's not
> something
> > > which looks intuitive from user's perspective.
> > > Control via app window would mean, tuning the functionality by
> > controlling
> > > the platform params. I think it's best one gets option to seperate the
> > > concerns of platform and that of app logic.
> > >
> > > If one wants to control the batch size, he/she should be able to do
> that
> > by
> > > just setting the property of batch size(a number), and not by changing
> > app
> > > window size (an indirect time unit).
> > >
> > > ~ Chinmay
> > > On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com> wrote:
> > >
> > > > But you will not allow multiple batches in the same window?
> > > > Can you please elaborate on failure scenarios and how it affects
> > > > idempotency.
> > > >
> > > > Chandni
> > > >
> > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > priyanka@datatorrent.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Sorry if I was not clear, but I am trying to propose the MAX_SIZE
> per
> > > > > window which the operator could process. The size could be less
> than
> > > the
> > > > > MAX_SIZE, no restriction about that.
> > > > >
> > > > > -Priyanka
> > > > >
> > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > chandni@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > How do you propose to to restrict the no. of tuples processed in
> an
> > > > > > application window < batch size.
> > > > > >
> > > > > > I don't see a way to enforce that batch size can never be less
> > tuples
> > > > > > processed in an application window.
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > priyag@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Chandni,
> > > > > > >
> > > > > > > How about restricting tuples which can be processed per window.
> > If
> > > > > > someone
> > > > > > > wants to process small and frequent batches, he can set batch
> > size
> > > to
> > > > > > some
> > > > > > > small value and also reduce the window size. This would build
> > some
> > > > back
> > > > > > > pressure of course. But that could be acceptable if one really
> > want
> > > > to
> > > > > > > restrict batch size.
> > > > > > > The though was triggered while working on Cassandra output
> > > operator.
> > > > > > > Cassandra creates problem in processing batches of size greater
> > > than
> > > > > some
> > > > > > > value (don't recall exact number right now). Other databases
> may
> > > want
> > > > > to
> > > > > > > restrict the batch size for similar or other reasons.
> > > > > > >
> > > > > > > -Priyanka
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > chandni@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Priyanka,
> > > > > > > >
> > > > > > > > AbstractBatchTransactionableStore assumes all tuples in one
> > > > > application
> > > > > > > as
> > > > > > > > a batch because it needs to store the tuples in the store
> > > > > exactly-once.
> > > > > > > >
> > > > > > > > If there is more than one batch in an application window,
> then
> > to
> > > > > store
> > > > > > > the
> > > > > > > > tuples exactly once the window Id needs to be written with
> > every
> > > > > tuple
> > > > > > as
> > > > > > > > well which is not that efficient. Therefore we take advantage
> > of
> > > > the
> > > > > > > > transaction support by saving just the window id once (not
> with
> > > > every
> > > > > > > > tuple) but this necessitates all the tuples to be considered
> > as a
> > > > > > batch.
> > > > > > > >
> > > > > > > > Every operator in a DAG can have its own application window
> > size.
> > > > So
> > > > > to
> > > > > > > > reduce the size per batch, the application window attribute
> > needs
> > > > to
> > > > > be
> > > > > > > > modified.
> > > > > > > >
> > > > > > > > Chandni
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > > chinmay@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 for this.
> > > > > > > > >
> > > > > > > > > ~ Chinmay.
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > > priyag@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > In Malhar we have an
> > > > > > > > > > operator AbstractBatchTransactionableStoreOutputOperator
> > > which
> > > > > > > creates
> > > > > > > > > > batches based on tuples received in a window. At the end
> of
> > > the
> > > > > > > window
> > > > > > > > > > these batches are sent to database for processing.
> > > > > > > > > > There is no way to configure MAX_SIZE on these batches.
> > Based
> > > > on
> > > > > > > input
> > > > > > > > > rate
> > > > > > > > > > the batch sizes can grow very high, and we might want to
> > > > restrict
> > > > > > > batch
> > > > > > > > > > size.
> > > > > > > > > >
> > > > > > > > > > Any operator can extend and do batch management on their
> > own,
> > > > > but I
> > > > > > > see
> > > > > > > > > it
> > > > > > > > > > as generic requirement and IMO we should change base
> class
> > > i.e.
> > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator class to
> > > accept
> > > > > > > > MAX_SIZE
> > > > > > > > > > for batch from outside.
> > > > > > > > > >
> > > > > > > > > > Any opinion on this?
> > > > > > > > > >
> > > > > > > > > > -Priyanka
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
Hi Chandni,

I totally agree with you that the transactions should be idempotent. And
that needs to be taken care of if the batch size is configurable.

Though, I have a question related to the second part where batch size is
controlled by by controlling app window size.
I agree with you that aggregation window is a unit of aggregation provided
by platform. But, if I understand correctly, that is time based.
If I want to aggregate based on number of tuples, would this be suitable?

To give you an example, lets say I have a store on which the transaction
size should never exceed 1000 operations.
And as a streaming application, it would be difficult to guess what would
be the input rate, hence its not possible to guess how many tuples will
become part of a single application window. In such case, how can the
application window size can be used to configure transaction batch size?
Wouldn't it make more sense to have the control via exact number of tuples?

Thanks,
Chinmay.


~ Chinmay.

On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Hey Chinmay/Priyanka,
>
> We need to write tuples exactly once in the store. Please address the
> failure scenarios on how to achieve exactly once and idempotency. I
> mentioned in my previous mail why multiple batches in a window is a problem
> with exactly once.
>
> Control via app window would mean, tuning the functionality by controlling
> the platform params. I think it's best one gets option to seperate the
> concerns of platform and that of app logic.
>
> Application window is a unit of aggregation. Every operator in a DAG can
> have different application window which is the support platform provides
> for application logic.
>
> Chandni
>
>
>
> On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> chinmay@datatorrent.com
> > wrote:
>
> > Hi,
> >
> > Just a thought on how it can possibly be done.
> >
> > The pseudo code might look like this:
> >
> > processTuple()
> > {
> > If(batchSize < configuredBatchSize){
> >    //add to the batch
> > }
> > Else {
> >   // process the batch as a transaction
> >   // empty the data structure of batch.
> > }
> > }
> >
> > endWindow()
> > {
> > // process the batch as transaction.
> > // empty the data structure of batch.
> > }
> >
> > This way, user can get better/direct control over what transaction means.
> >
> > As chandni rightly said, one can reduce the application window size for
> the
> > operator, and that would reduce the batch size. But that's not something
> > which looks intuitive from user's perspective.
> > Control via app window would mean, tuning the functionality by
> controlling
> > the platform params. I think it's best one gets option to seperate the
> > concerns of platform and that of app logic.
> >
> > If one wants to control the batch size, he/she should be able to do that
> by
> > just setting the property of batch size(a number), and not by changing
> app
> > window size (an indirect time unit).
> >
> > ~ Chinmay
> > On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com> wrote:
> >
> > > But you will not allow multiple batches in the same window?
> > > Can you please elaborate on failure scenarios and how it affects
> > > idempotency.
> > >
> > > Chandni
> > >
> > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > priyanka@datatorrent.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Sorry if I was not clear, but I am trying to propose the MAX_SIZE per
> > > > window which the operator could process. The size could be less than
> > the
> > > > MAX_SIZE, no restriction about that.
> > > >
> > > > -Priyanka
> > > >
> > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > chandni@datatorrent.com>
> > > > wrote:
> > > >
> > > > > How do you propose to to restrict the no. of tuples processed in an
> > > > > application window < batch size.
> > > > >
> > > > > I don't see a way to enforce that batch size can never be less
> tuples
> > > > > processed in an application window.
> > > > >
> > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> priyag@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Chandni,
> > > > > >
> > > > > > How about restricting tuples which can be processed per window.
> If
> > > > > someone
> > > > > > wants to process small and frequent batches, he can set batch
> size
> > to
> > > > > some
> > > > > > small value and also reduce the window size. This would build
> some
> > > back
> > > > > > pressure of course. But that could be acceptable if one really
> want
> > > to
> > > > > > restrict batch size.
> > > > > > The though was triggered while working on Cassandra output
> > operator.
> > > > > > Cassandra creates problem in processing batches of size greater
> > than
> > > > some
> > > > > > value (don't recall exact number right now). Other databases may
> > want
> > > > to
> > > > > > restrict the batch size for similar or other reasons.
> > > > > >
> > > > > > -Priyanka
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > chandni@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Priyanka,
> > > > > > >
> > > > > > > AbstractBatchTransactionableStore assumes all tuples in one
> > > > application
> > > > > > as
> > > > > > > a batch because it needs to store the tuples in the store
> > > > exactly-once.
> > > > > > >
> > > > > > > If there is more than one batch in an application window, then
> to
> > > > store
> > > > > > the
> > > > > > > tuples exactly once the window Id needs to be written with
> every
> > > > tuple
> > > > > as
> > > > > > > well which is not that efficient. Therefore we take advantage
> of
> > > the
> > > > > > > transaction support by saving just the window id once (not with
> > > every
> > > > > > > tuple) but this necessitates all the tuples to be considered
> as a
> > > > > batch.
> > > > > > >
> > > > > > > Every operator in a DAG can have its own application window
> size.
> > > So
> > > > to
> > > > > > > reduce the size per batch, the application window attribute
> needs
> > > to
> > > > be
> > > > > > > modified.
> > > > > > >
> > > > > > > Chandni
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > > chinmay@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 for this.
> > > > > > > >
> > > > > > > > ~ Chinmay.
> > > > > > > >
> > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > > priyag@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > In Malhar we have an
> > > > > > > > > operator AbstractBatchTransactionableStoreOutputOperator
> > which
> > > > > > creates
> > > > > > > > > batches based on tuples received in a window. At the end of
> > the
> > > > > > window
> > > > > > > > > these batches are sent to database for processing.
> > > > > > > > > There is no way to configure MAX_SIZE on these batches.
> Based
> > > on
> > > > > > input
> > > > > > > > rate
> > > > > > > > > the batch sizes can grow very high, and we might want to
> > > restrict
> > > > > > batch
> > > > > > > > > size.
> > > > > > > > >
> > > > > > > > > Any operator can extend and do batch management on their
> own,
> > > > but I
> > > > > > see
> > > > > > > > it
> > > > > > > > > as generic requirement and IMO we should change base class
> > i.e.
> > > > > > > > > AbstractBatchTransactionableStoreOutputOperator class to
> > accept
> > > > > > > MAX_SIZE
> > > > > > > > > for batch from outside.
> > > > > > > > >
> > > > > > > > > Any opinion on this?
> > > > > > > > >
> > > > > > > > > -Priyanka
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
Hey Chinmay/Priyanka,

We need to write tuples exactly once in the store. Please address the
failure scenarios on how to achieve exactly once and idempotency. I
mentioned in my previous mail why multiple batches in a window is a problem
with exactly once.

Control via app window would mean, tuning the functionality by controlling
the platform params. I think it's best one gets option to seperate the
concerns of platform and that of app logic.

Application window is a unit of aggregation. Every operator in a DAG can
have different application window which is the support platform provides
for application logic.

Chandni



On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <chinmay@datatorrent.com
> wrote:

> Hi,
>
> Just a thought on how it can possibly be done.
>
> The pseudo code might look like this:
>
> processTuple()
> {
> If(batchSize < configuredBatchSize){
>    //add to the batch
> }
> Else {
>   // process the batch as a transaction
>   // empty the data structure of batch.
> }
> }
>
> endWindow()
> {
> // process the batch as transaction.
> // empty the data structure of batch.
> }
>
> This way, user can get better/direct control over what transaction means.
>
> As chandni rightly said, one can reduce the application window size for the
> operator, and that would reduce the batch size. But that's not something
> which looks intuitive from user's perspective.
> Control via app window would mean, tuning the functionality by controlling
> the platform params. I think it's best one gets option to seperate the
> concerns of platform and that of app logic.
>
> If one wants to control the batch size, he/she should be able to do that by
> just setting the property of batch size(a number), and not by changing app
> window size (an indirect time unit).
>
> ~ Chinmay
> On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com> wrote:
>
> > But you will not allow multiple batches in the same window?
> > Can you please elaborate on failure scenarios and how it affects
> > idempotency.
> >
> > Chandni
> >
> > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> priyanka@datatorrent.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > Sorry if I was not clear, but I am trying to propose the MAX_SIZE per
> > > window which the operator could process. The size could be less than
> the
> > > MAX_SIZE, no restriction about that.
> > >
> > > -Priyanka
> > >
> > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> chandni@datatorrent.com>
> > > wrote:
> > >
> > > > How do you propose to to restrict the no. of tuples processed in an
> > > > application window < batch size.
> > > >
> > > > I don't see a way to enforce that batch size can never be less tuples
> > > > processed in an application window.
> > > >
> > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <pr...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Chandni,
> > > > >
> > > > > How about restricting tuples which can be processed per window. If
> > > > someone
> > > > > wants to process small and frequent batches, he can set batch size
> to
> > > > some
> > > > > small value and also reduce the window size. This would build some
> > back
> > > > > pressure of course. But that could be acceptable if one really want
> > to
> > > > > restrict batch size.
> > > > > The though was triggered while working on Cassandra output
> operator.
> > > > > Cassandra creates problem in processing batches of size greater
> than
> > > some
> > > > > value (don't recall exact number right now). Other databases may
> want
> > > to
> > > > > restrict the batch size for similar or other reasons.
> > > > >
> > > > > -Priyanka
> > > > >
> > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > chandni@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Priyanka,
> > > > > >
> > > > > > AbstractBatchTransactionableStore assumes all tuples in one
> > > application
> > > > > as
> > > > > > a batch because it needs to store the tuples in the store
> > > exactly-once.
> > > > > >
> > > > > > If there is more than one batch in an application window, then to
> > > store
> > > > > the
> > > > > > tuples exactly once the window Id needs to be written with every
> > > tuple
> > > > as
> > > > > > well which is not that efficient. Therefore we take advantage of
> > the
> > > > > > transaction support by saving just the window id once (not with
> > every
> > > > > > tuple) but this necessitates all the tuples to be considered as a
> > > > batch.
> > > > > >
> > > > > > Every operator in a DAG can have its own application window size.
> > So
> > > to
> > > > > > reduce the size per batch, the application window attribute needs
> > to
> > > be
> > > > > > modified.
> > > > > >
> > > > > > Chandni
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > > chinmay@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 for this.
> > > > > > >
> > > > > > > ~ Chinmay.
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > > priyag@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > In Malhar we have an
> > > > > > > > operator AbstractBatchTransactionableStoreOutputOperator
> which
> > > > > creates
> > > > > > > > batches based on tuples received in a window. At the end of
> the
> > > > > window
> > > > > > > > these batches are sent to database for processing.
> > > > > > > > There is no way to configure MAX_SIZE on these batches. Based
> > on
> > > > > input
> > > > > > > rate
> > > > > > > > the batch sizes can grow very high, and we might want to
> > restrict
> > > > > batch
> > > > > > > > size.
> > > > > > > >
> > > > > > > > Any operator can extend and do batch management on their own,
> > > but I
> > > > > see
> > > > > > > it
> > > > > > > > as generic requirement and IMO we should change base class
> i.e.
> > > > > > > > AbstractBatchTransactionableStoreOutputOperator class to
> accept
> > > > > > MAX_SIZE
> > > > > > > > for batch from outside.
> > > > > > > >
> > > > > > > > Any opinion on this?
> > > > > > > >
> > > > > > > > -Priyanka
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
Hi,

Just a thought on how it can possibly be done.

The pseudo code might look like this:

processTuple()
{
If(batchSize < configuredBatchSize){
   //add to the batch
}
Else {
  // process the batch as a transaction
  // empty the data structure of batch.
}
}

endWindow()
{
// process the batch as transaction.
// empty the data structure of batch.
}

This way, user can get better/direct control over what transaction means.

As chandni rightly said, one can reduce the application window size for the
operator, and that would reduce the batch size. But that's not something
which looks intuitive from user's perspective.
Control via app window would mean, tuning the functionality by controlling
the platform params. I think it's best one gets option to seperate the
concerns of platform and that of app logic.

If one wants to control the batch size, he/she should be able to do that by
just setting the property of batch size(a number), and not by changing app
window size (an indirect time unit).

~ Chinmay
On 28 Dec 2015 22:53, "Chandni Singh" <ch...@datatorrent.com> wrote:

> But you will not allow multiple batches in the same window?
> Can you please elaborate on failure scenarios and how it affects
> idempotency.
>
> Chandni
>
> On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <priyanka@datatorrent.com
> >
> wrote:
>
> > Hi,
> >
> > Sorry if I was not clear, but I am trying to propose the MAX_SIZE per
> > window which the operator could process. The size could be less than the
> > MAX_SIZE, no restriction about that.
> >
> > -Priyanka
> >
> > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> > > How do you propose to to restrict the no. of tuples processed in an
> > > application window < batch size.
> > >
> > > I don't see a way to enforce that batch size can never be less tuples
> > > processed in an application window.
> > >
> > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <pr...@apache.org>
> > > wrote:
> > >
> > > > Hi Chandni,
> > > >
> > > > How about restricting tuples which can be processed per window. If
> > > someone
> > > > wants to process small and frequent batches, he can set batch size to
> > > some
> > > > small value and also reduce the window size. This would build some
> back
> > > > pressure of course. But that could be acceptable if one really want
> to
> > > > restrict batch size.
> > > > The though was triggered while working on Cassandra output operator.
> > > > Cassandra creates problem in processing batches of size greater than
> > some
> > > > value (don't recall exact number right now). Other databases may want
> > to
> > > > restrict the batch size for similar or other reasons.
> > > >
> > > > -Priyanka
> > > >
> > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > chandni@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Priyanka,
> > > > >
> > > > > AbstractBatchTransactionableStore assumes all tuples in one
> > application
> > > > as
> > > > > a batch because it needs to store the tuples in the store
> > exactly-once.
> > > > >
> > > > > If there is more than one batch in an application window, then to
> > store
> > > > the
> > > > > tuples exactly once the window Id needs to be written with every
> > tuple
> > > as
> > > > > well which is not that efficient. Therefore we take advantage of
> the
> > > > > transaction support by saving just the window id once (not with
> every
> > > > > tuple) but this necessitates all the tuples to be considered as a
> > > batch.
> > > > >
> > > > > Every operator in a DAG can have its own application window size.
> So
> > to
> > > > > reduce the size per batch, the application window attribute needs
> to
> > be
> > > > > modified.
> > > > >
> > > > > Chandni
> > > > >
> > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > > chinmay@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > +1 for this.
> > > > > >
> > > > > > ~ Chinmay.
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> > priyag@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > In Malhar we have an
> > > > > > > operator AbstractBatchTransactionableStoreOutputOperator which
> > > > creates
> > > > > > > batches based on tuples received in a window. At the end of the
> > > > window
> > > > > > > these batches are sent to database for processing.
> > > > > > > There is no way to configure MAX_SIZE on these batches. Based
> on
> > > > input
> > > > > > rate
> > > > > > > the batch sizes can grow very high, and we might want to
> restrict
> > > > batch
> > > > > > > size.
> > > > > > >
> > > > > > > Any operator can extend and do batch management on their own,
> > but I
> > > > see
> > > > > > it
> > > > > > > as generic requirement and IMO we should change base class i.e.
> > > > > > > AbstractBatchTransactionableStoreOutputOperator class to accept
> > > > > MAX_SIZE
> > > > > > > for batch from outside.
> > > > > > >
> > > > > > > Any opinion on this?
> > > > > > >
> > > > > > > -Priyanka
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
But you will not allow multiple batches in the same window?
Can you please elaborate on failure scenarios and how it affects
idempotency.

Chandni

On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <pr...@datatorrent.com>
wrote:

> Hi,
>
> Sorry if I was not clear, but I am trying to propose the MAX_SIZE per
> window which the operator could process. The size could be less than the
> MAX_SIZE, no restriction about that.
>
> -Priyanka
>
> On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > How do you propose to to restrict the no. of tuples processed in an
> > application window < batch size.
> >
> > I don't see a way to enforce that batch size can never be less tuples
> > processed in an application window.
> >
> > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <pr...@apache.org>
> > wrote:
> >
> > > Hi Chandni,
> > >
> > > How about restricting tuples which can be processed per window. If
> > someone
> > > wants to process small and frequent batches, he can set batch size to
> > some
> > > small value and also reduce the window size. This would build some back
> > > pressure of course. But that could be acceptable if one really want to
> > > restrict batch size.
> > > The though was triggered while working on Cassandra output operator.
> > > Cassandra creates problem in processing batches of size greater than
> some
> > > value (don't recall exact number right now). Other databases may want
> to
> > > restrict the batch size for similar or other reasons.
> > >
> > > -Priyanka
> > >
> > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> chandni@datatorrent.com>
> > > wrote:
> > >
> > > > Priyanka,
> > > >
> > > > AbstractBatchTransactionableStore assumes all tuples in one
> application
> > > as
> > > > a batch because it needs to store the tuples in the store
> exactly-once.
> > > >
> > > > If there is more than one batch in an application window, then to
> store
> > > the
> > > > tuples exactly once the window Id needs to be written with every
> tuple
> > as
> > > > well which is not that efficient. Therefore we take advantage of the
> > > > transaction support by saving just the window id once (not with every
> > > > tuple) but this necessitates all the tuples to be considered as a
> > batch.
> > > >
> > > > Every operator in a DAG can have its own application window size. So
> to
> > > > reduce the size per batch, the application window attribute needs to
> be
> > > > modified.
> > > >
> > > > Chandni
> > > >
> > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > > chinmay@datatorrent.com>
> > > > wrote:
> > > >
> > > > > +1 for this.
> > > > >
> > > > > ~ Chinmay.
> > > > >
> > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <
> priyag@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > In Malhar we have an
> > > > > > operator AbstractBatchTransactionableStoreOutputOperator which
> > > creates
> > > > > > batches based on tuples received in a window. At the end of the
> > > window
> > > > > > these batches are sent to database for processing.
> > > > > > There is no way to configure MAX_SIZE on these batches. Based on
> > > input
> > > > > rate
> > > > > > the batch sizes can grow very high, and we might want to restrict
> > > batch
> > > > > > size.
> > > > > >
> > > > > > Any operator can extend and do batch management on their own,
> but I
> > > see
> > > > > it
> > > > > > as generic requirement and IMO we should change base class i.e.
> > > > > > AbstractBatchTransactionableStoreOutputOperator class to accept
> > > > MAX_SIZE
> > > > > > for batch from outside.
> > > > > >
> > > > > > Any opinion on this?
> > > > > >
> > > > > > -Priyanka
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Priyanka Gugale <pr...@datatorrent.com>.
Hi,

Sorry if I was not clear, but I am trying to propose the MAX_SIZE per
window which the operator could process. The size could be less than the
MAX_SIZE, no restriction about that.

-Priyanka

On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> How do you propose to to restrict the no. of tuples processed in an
> application window < batch size.
>
> I don't see a way to enforce that batch size can never be less tuples
> processed in an application window.
>
> On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <pr...@apache.org>
> wrote:
>
> > Hi Chandni,
> >
> > How about restricting tuples which can be processed per window. If
> someone
> > wants to process small and frequent batches, he can set batch size to
> some
> > small value and also reduce the window size. This would build some back
> > pressure of course. But that could be acceptable if one really want to
> > restrict batch size.
> > The though was triggered while working on Cassandra output operator.
> > Cassandra creates problem in processing batches of size greater than some
> > value (don't recall exact number right now). Other databases may want to
> > restrict the batch size for similar or other reasons.
> >
> > -Priyanka
> >
> > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <ch...@datatorrent.com>
> > wrote:
> >
> > > Priyanka,
> > >
> > > AbstractBatchTransactionableStore assumes all tuples in one application
> > as
> > > a batch because it needs to store the tuples in the store exactly-once.
> > >
> > > If there is more than one batch in an application window, then to store
> > the
> > > tuples exactly once the window Id needs to be written with every tuple
> as
> > > well which is not that efficient. Therefore we take advantage of the
> > > transaction support by saving just the window id once (not with every
> > > tuple) but this necessitates all the tuples to be considered as a
> batch.
> > >
> > > Every operator in a DAG can have its own application window size. So to
> > > reduce the size per batch, the application window attribute needs to be
> > > modified.
> > >
> > > Chandni
> > >
> > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > > chinmay@datatorrent.com>
> > > wrote:
> > >
> > > > +1 for this.
> > > >
> > > > ~ Chinmay.
> > > >
> > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <pr...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > In Malhar we have an
> > > > > operator AbstractBatchTransactionableStoreOutputOperator which
> > creates
> > > > > batches based on tuples received in a window. At the end of the
> > window
> > > > > these batches are sent to database for processing.
> > > > > There is no way to configure MAX_SIZE on these batches. Based on
> > input
> > > > rate
> > > > > the batch sizes can grow very high, and we might want to restrict
> > batch
> > > > > size.
> > > > >
> > > > > Any operator can extend and do batch management on their own, but I
> > see
> > > > it
> > > > > as generic requirement and IMO we should change base class i.e.
> > > > > AbstractBatchTransactionableStoreOutputOperator class to accept
> > > MAX_SIZE
> > > > > for batch from outside.
> > > > >
> > > > > Any opinion on this?
> > > > >
> > > > > -Priyanka
> > > > >
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
How do you propose to to restrict the no. of tuples processed in an
application window < batch size.

I don't see a way to enforce that batch size can never be less tuples
processed in an application window.

On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <pr...@apache.org> wrote:

> Hi Chandni,
>
> How about restricting tuples which can be processed per window. If someone
> wants to process small and frequent batches, he can set batch size to some
> small value and also reduce the window size. This would build some back
> pressure of course. But that could be acceptable if one really want to
> restrict batch size.
> The though was triggered while working on Cassandra output operator.
> Cassandra creates problem in processing batches of size greater than some
> value (don't recall exact number right now). Other databases may want to
> restrict the batch size for similar or other reasons.
>
> -Priyanka
>
> On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <ch...@datatorrent.com>
> wrote:
>
> > Priyanka,
> >
> > AbstractBatchTransactionableStore assumes all tuples in one application
> as
> > a batch because it needs to store the tuples in the store exactly-once.
> >
> > If there is more than one batch in an application window, then to store
> the
> > tuples exactly once the window Id needs to be written with every tuple as
> > well which is not that efficient. Therefore we take advantage of the
> > transaction support by saving just the window id once (not with every
> > tuple) but this necessitates all the tuples to be considered as a batch.
> >
> > Every operator in a DAG can have its own application window size. So to
> > reduce the size per batch, the application window attribute needs to be
> > modified.
> >
> > Chandni
> >
> > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> > chinmay@datatorrent.com>
> > wrote:
> >
> > > +1 for this.
> > >
> > > ~ Chinmay.
> > >
> > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <pr...@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > In Malhar we have an
> > > > operator AbstractBatchTransactionableStoreOutputOperator which
> creates
> > > > batches based on tuples received in a window. At the end of the
> window
> > > > these batches are sent to database for processing.
> > > > There is no way to configure MAX_SIZE on these batches. Based on
> input
> > > rate
> > > > the batch sizes can grow very high, and we might want to restrict
> batch
> > > > size.
> > > >
> > > > Any operator can extend and do batch management on their own, but I
> see
> > > it
> > > > as generic requirement and IMO we should change base class i.e.
> > > > AbstractBatchTransactionableStoreOutputOperator class to accept
> > MAX_SIZE
> > > > for batch from outside.
> > > >
> > > > Any opinion on this?
> > > >
> > > > -Priyanka
> > > >
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Priyanka Gugale <pr...@apache.org>.
Hi Chandni,

How about restricting tuples which can be processed per window. If someone
wants to process small and frequent batches, he can set batch size to some
small value and also reduce the window size. This would build some back
pressure of course. But that could be acceptable if one really want to
restrict batch size.
The though was triggered while working on Cassandra output operator.
Cassandra creates problem in processing batches of size greater than some
value (don't recall exact number right now). Other databases may want to
restrict the batch size for similar or other reasons.

-Priyanka

On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <ch...@datatorrent.com>
wrote:

> Priyanka,
>
> AbstractBatchTransactionableStore assumes all tuples in one application as
> a batch because it needs to store the tuples in the store exactly-once.
>
> If there is more than one batch in an application window, then to store the
> tuples exactly once the window Id needs to be written with every tuple as
> well which is not that efficient. Therefore we take advantage of the
> transaction support by saving just the window id once (not with every
> tuple) but this necessitates all the tuples to be considered as a batch.
>
> Every operator in a DAG can have its own application window size. So to
> reduce the size per batch, the application window attribute needs to be
> modified.
>
> Chandni
>
> On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <
> chinmay@datatorrent.com>
> wrote:
>
> > +1 for this.
> >
> > ~ Chinmay.
> >
> > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <pr...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > In Malhar we have an
> > > operator AbstractBatchTransactionableStoreOutputOperator which creates
> > > batches based on tuples received in a window. At the end of the window
> > > these batches are sent to database for processing.
> > > There is no way to configure MAX_SIZE on these batches. Based on input
> > rate
> > > the batch sizes can grow very high, and we might want to restrict batch
> > > size.
> > >
> > > Any operator can extend and do batch management on their own, but I see
> > it
> > > as generic requirement and IMO we should change base class i.e.
> > > AbstractBatchTransactionableStoreOutputOperator class to accept
> MAX_SIZE
> > > for batch from outside.
> > >
> > > Any opinion on this?
> > >
> > > -Priyanka
> > >
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chandni Singh <ch...@datatorrent.com>.
Priyanka,

AbstractBatchTransactionableStore assumes all tuples in one application as
a batch because it needs to store the tuples in the store exactly-once.

If there is more than one batch in an application window, then to store the
tuples exactly once the window Id needs to be written with every tuple as
well which is not that efficient. Therefore we take advantage of the
transaction support by saving just the window id once (not with every
tuple) but this necessitates all the tuples to be considered as a batch.

Every operator in a DAG can have its own application window size. So to
reduce the size per batch, the application window attribute needs to be
modified.

Chandni

On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar <ch...@datatorrent.com>
wrote:

> +1 for this.
>
> ~ Chinmay.
>
> On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <pr...@apache.org>
> wrote:
>
> > Hi,
> >
> > In Malhar we have an
> > operator AbstractBatchTransactionableStoreOutputOperator which creates
> > batches based on tuples received in a window. At the end of the window
> > these batches are sent to database for processing.
> > There is no way to configure MAX_SIZE on these batches. Based on input
> rate
> > the batch sizes can grow very high, and we might want to restrict batch
> > size.
> >
> > Any operator can extend and do batch management on their own, but I see
> it
> > as generic requirement and IMO we should change base class i.e.
> > AbstractBatchTransactionableStoreOutputOperator class to accept MAX_SIZE
> > for batch from outside.
> >
> > Any opinion on this?
> >
> > -Priyanka
> >
>

Re: Writing batches to database using Transactionable Store Output operator

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
+1 for this.

~ Chinmay.

On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale <pr...@apache.org> wrote:

> Hi,
>
> In Malhar we have an
> operator AbstractBatchTransactionableStoreOutputOperator which creates
> batches based on tuples received in a window. At the end of the window
> these batches are sent to database for processing.
> There is no way to configure MAX_SIZE on these batches. Based on input rate
> the batch sizes can grow very high, and we might want to restrict batch
> size.
>
> Any operator can extend and do batch management on their own, but I see it
> as generic requirement and IMO we should change base class i.e.
> AbstractBatchTransactionableStoreOutputOperator class to accept MAX_SIZE
> for batch from outside.
>
> Any opinion on this?
>
> -Priyanka
>