You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Atri Sharma <at...@apache.org> on 2015/08/28 22:37:10 UTC

Window Transactions

Team,

Does it make sense to have functionality to have all or nothing
transactional system for windows? With future functionality to have dynamic
operators I feel it makes sense to allow data from an entire window to be
processed or none of the data to be sent.

I am not sure if window batching in its current form is a logical
implementation of this feature.

Thoughts?

Regards,

Atri

Re: Window Transactions

Posted by Thomas Weise <th...@datatorrent.com>.
It becomes a distributed transaction problem, unless all state including
checkpoint can be written in a single transaction (not possible when using
HDFS for checkpointing).

If your external system transaction completes and the checkpointing fails,
the window will be reprocessed. Hence, recording the completed windowId as
part of the transaction in the external system. More frequent checkpointing
reduces the number of windows replayed during recovery, but does not
eliminate the possibility of replay. Hence the solution is in the operator.

Thomas



On Sun, Aug 30, 2015 at 12:33 PM, Timothy Farkas <ti...@datatorrent.com>
wrote:

> Hi Atri,
>
> As far as I understand point 2 can be done. You can just set your
> checkpoint window to be smaller than your application window.
>
> Someone else can correct me if I'm wrong but I believe 1) depends on the
> operator.
>
> If the operator of interest doesn't talk to an external system (hdfs, a
> database, send an email) and is idempotent, then I don't think any fault
> tolerance guarantees are gained by imposing this requirement.
>
> If the operator is talking to an external system then the pattern Thomas
> described could be used to achieve this. I assume you want this
> functionality because you want to talk to a system that doesn't support
> transactions? I believe implementing all or nothing windows for writing to
> external systems will always require operator level logic that utilizes
> transactions provided by the external system. This is required because if
> an external system doesn't provide transactions there will always be a race
> condition between the work done by the operator and the data written to the
> external system.
>
> For example lets say Operator A is writing to System B, and System B
> doesn't support transactions:
>
> 1. ) Let's say Operator A works by writing tuples to System B and Operator
> A is checkpointed every application window.
> 2. ) Let's say Operator A was checkpointed before window 1 started and
> Operator A is on window 1.
> 3. ) Operator A writes tuples 1, 2, and 3 but fails before window 1
> completes.
> 4. ) Operator A is restored to the beginning of window 1 and writes tuples,
> 1, 2, and 3 again.
> 5. ) Now we have duplicates.
>
> If we try to record individual tuples sent to System B we run into a
> similar problem.
>
> 1. ) Let's say Operator A works by writing a tuple to System B and then
> records somewhere that the tuple was written to System B.
> 2. ) Let's say Operator A was checkpointed before window 1 started and
> Operator A is on window 1.
> 3. ) Operator A writes tuple 1 and then records that tuple 1 was written.
> 4. ) Operator A writes tuple 2 and then fails.
> 5. ) Operator A is restored to the beginning of window 1.
> 6. ) Operator A checks if tuple 1 was written and sees that it was, so it
> skips tuple 1.
> 7. ) Operator A checks if tuple 2 was written and sees that it wasn't, so
> it writes tuple 2 again.
> 5. ) Now tuple 2 is duplicated.
>
> Reversing the order of writing a tuple and then recording the write will
> also produce a problem. Instead of data being duplicated, data could be
> lost.
>
> In summary I think the implementation all or nothing windows depends on the
> system you are talking to and the guarantees it gives you.
>
> Thanks,
> Tim
>
> On Sun, Aug 30, 2015 at 9:05 AM, Atri Sharma <at...@gmail.com> wrote:
>
> > Thanks Thomas and everyone.
> >
> > I reflect with what Thomas explained and I have a few points to be added
> on
> > it.
> >
> > I now understand that the ability to process or commit a transaction is
> > present. I am trying to understand following use cases:
> >
> > 1) a window is defined. The operator either needs to process the entire
> > window or none at all. Can we have this functionality now?
> >
> > 2) checkpoints within a window. If we fail, we can send the last seen
> > checkpoint to the source (if source can handle it) and ask for data
> further
> > that point.
> > On 29 Aug 2015 04:38, "Thomas Weise" <th...@datatorrent.com> wrote:
> >
> > > Atri,
> > >
> > > A concept of "transactional window" is needed for some applications
> that
> > > interact with external systems. A number of Malhar operators support it
> > > today. For example, a JDBC operator might perform all operations
> within a
> > > transaction that commences with the first write in a window and
> endWindow
> > > will commit the transaction. The engine provides the callbacks, the
> > > operator implements the transaction based on the capabilities of the
> > > external system. Note that this does not imply batching, it merely
> speaks
> > > to transaction demarcation.
> > >
> > > But this is just part of the work needed to make the operator
> > > "transactional". Windows can be reprocessed based on the processing
> > > semantics. When a container goes down, the operator will reset to the
> > > recovery checkpoint and reprocess the windows from the checkpoint till
> > the
> > > point where the failure occurred. Unless the processing done by the
> > > operator is idempotent, this would lead to incorrect results. For
> > example,
> > > if the operation was "UPDATE sometable SET count = count + 1", we would
> > > double count.
> > >
> > > One technique to deal with this is to maintain the windowId as part of
> > the
> > > state that gets committed to the external system. Now we can skip the
> > > processing if we find that the window was already processed. Of course,
> > > this requires that the upstream operators also deliver the tuples in an
> > > idempotent manner on a window replay.
> > >
> > > Thomas
> > >
> > > On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <
> chetan@datatorrent.com>
> > > wrote:
> > >
> > > > Atri,
> > > >
> > > >   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> > > > transaction. We do not hold the first event after BEGIN_WINDOW
> hostage
> > > > until the END_WINDOW is received. This allows us to provide almost
> zero
> > > > latency at per tuple level. This is one of the the differentiating
> > > > paradigms for Apex.
> > > >
> > > >   If we do it otherwise - the platform degrades to micro-batch
> > processing
> > > > mode. More details about it here:
> > > >
> > > >
> > > >
> > >
> >
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
> > > >
> > > >
> > > >  Let me know if this answers your question or I misunderstood the
> > > question.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > >
> > > >
> > > > On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org>
> wrote:
> > > >
> > > > > Team,
> > > > >
> > > > > Does it make sense to have functionality to have all or nothing
> > > > > transactional system for windows? With future functionality to have
> > > > dynamic
> > > > > operators I feel it makes sense to allow data from an entire window
> > to
> > > be
> > > > > processed or none of the data to be sent.
> > > > >
> > > > > I am not sure if window batching in its current form is a logical
> > > > > implementation of this feature.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Regards,
> > > > >
> > > > > Atri
> > > > >
> > > >
> > >
> >
>

Re: Window Transactions

Posted by Timothy Farkas <ti...@datatorrent.com>.
Hi Atri,

As far as I understand point 2 can be done. You can just set your
checkpoint window to be smaller than your application window.

Someone else can correct me if I'm wrong but I believe 1) depends on the
operator.

If the operator of interest doesn't talk to an external system (hdfs, a
database, send an email) and is idempotent, then I don't think any fault
tolerance guarantees are gained by imposing this requirement.

If the operator is talking to an external system then the pattern Thomas
described could be used to achieve this. I assume you want this
functionality because you want to talk to a system that doesn't support
transactions? I believe implementing all or nothing windows for writing to
external systems will always require operator level logic that utilizes
transactions provided by the external system. This is required because if
an external system doesn't provide transactions there will always be a race
condition between the work done by the operator and the data written to the
external system.

For example lets say Operator A is writing to System B, and System B
doesn't support transactions:

1. ) Let's say Operator A works by writing tuples to System B and Operator
A is checkpointed every application window.
2. ) Let's say Operator A was checkpointed before window 1 started and
Operator A is on window 1.
3. ) Operator A writes tuples 1, 2, and 3 but fails before window 1
completes.
4. ) Operator A is restored to the beginning of window 1 and writes tuples,
1, 2, and 3 again.
5. ) Now we have duplicates.

If we try to record individual tuples sent to System B we run into a
similar problem.

1. ) Let's say Operator A works by writing a tuple to System B and then
records somewhere that the tuple was written to System B.
2. ) Let's say Operator A was checkpointed before window 1 started and
Operator A is on window 1.
3. ) Operator A writes tuple 1 and then records that tuple 1 was written.
4. ) Operator A writes tuple 2 and then fails.
5. ) Operator A is restored to the beginning of window 1.
6. ) Operator A checks if tuple 1 was written and sees that it was, so it
skips tuple 1.
7. ) Operator A checks if tuple 2 was written and sees that it wasn't, so
it writes tuple 2 again.
5. ) Now tuple 2 is duplicated.

Reversing the order of writing a tuple and then recording the write will
also produce a problem. Instead of data being duplicated, data could be
lost.

In summary I think the implementation all or nothing windows depends on the
system you are talking to and the guarantees it gives you.

Thanks,
Tim

On Sun, Aug 30, 2015 at 9:05 AM, Atri Sharma <at...@gmail.com> wrote:

> Thanks Thomas and everyone.
>
> I reflect with what Thomas explained and I have a few points to be added on
> it.
>
> I now understand that the ability to process or commit a transaction is
> present. I am trying to understand following use cases:
>
> 1) a window is defined. The operator either needs to process the entire
> window or none at all. Can we have this functionality now?
>
> 2) checkpoints within a window. If we fail, we can send the last seen
> checkpoint to the source (if source can handle it) and ask for data further
> that point.
> On 29 Aug 2015 04:38, "Thomas Weise" <th...@datatorrent.com> wrote:
>
> > Atri,
> >
> > A concept of "transactional window" is needed for some applications that
> > interact with external systems. A number of Malhar operators support it
> > today. For example, a JDBC operator might perform all operations within a
> > transaction that commences with the first write in a window and endWindow
> > will commit the transaction. The engine provides the callbacks, the
> > operator implements the transaction based on the capabilities of the
> > external system. Note that this does not imply batching, it merely speaks
> > to transaction demarcation.
> >
> > But this is just part of the work needed to make the operator
> > "transactional". Windows can be reprocessed based on the processing
> > semantics. When a container goes down, the operator will reset to the
> > recovery checkpoint and reprocess the windows from the checkpoint till
> the
> > point where the failure occurred. Unless the processing done by the
> > operator is idempotent, this would lead to incorrect results. For
> example,
> > if the operation was "UPDATE sometable SET count = count + 1", we would
> > double count.
> >
> > One technique to deal with this is to maintain the windowId as part of
> the
> > state that gets committed to the external system. Now we can skip the
> > processing if we find that the window was already processed. Of course,
> > this requires that the upstream operators also deliver the tuples in an
> > idempotent manner on a window replay.
> >
> > Thomas
> >
> > On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <ch...@datatorrent.com>
> > wrote:
> >
> > > Atri,
> > >
> > >   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> > > transaction. We do not hold the first event after BEGIN_WINDOW hostage
> > > until the END_WINDOW is received. This allows us to provide almost zero
> > > latency at per tuple level. This is one of the the differentiating
> > > paradigms for Apex.
> > >
> > >   If we do it otherwise - the platform degrades to micro-batch
> processing
> > > mode. More details about it here:
> > >
> > >
> > >
> >
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
> > >
> > >
> > >  Let me know if this answers your question or I misunderstood the
> > question.
> > >
> > > --
> > > Chetan
> > >
> > >
> > >
> > > On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org> wrote:
> > >
> > > > Team,
> > > >
> > > > Does it make sense to have functionality to have all or nothing
> > > > transactional system for windows? With future functionality to have
> > > dynamic
> > > > operators I feel it makes sense to allow data from an entire window
> to
> > be
> > > > processed or none of the data to be sent.
> > > >
> > > > I am not sure if window batching in its current form is a logical
> > > > implementation of this feature.
> > > >
> > > > Thoughts?
> > > >
> > > > Regards,
> > > >
> > > > Atri
> > > >
> > >
> >
>

Re: Window Transactions

Posted by Atri Sharma <at...@gmail.com>.
Thanks Thomas and everyone.

I reflect with what Thomas explained and I have a few points to be added on
it.

I now understand that the ability to process or commit a transaction is
present. I am trying to understand following use cases:

1) a window is defined. The operator either needs to process the entire
window or none at all. Can we have this functionality now?

2) checkpoints within a window. If we fail, we can send the last seen
checkpoint to the source (if source can handle it) and ask for data further
that point.
On 29 Aug 2015 04:38, "Thomas Weise" <th...@datatorrent.com> wrote:

> Atri,
>
> A concept of "transactional window" is needed for some applications that
> interact with external systems. A number of Malhar operators support it
> today. For example, a JDBC operator might perform all operations within a
> transaction that commences with the first write in a window and endWindow
> will commit the transaction. The engine provides the callbacks, the
> operator implements the transaction based on the capabilities of the
> external system. Note that this does not imply batching, it merely speaks
> to transaction demarcation.
>
> But this is just part of the work needed to make the operator
> "transactional". Windows can be reprocessed based on the processing
> semantics. When a container goes down, the operator will reset to the
> recovery checkpoint and reprocess the windows from the checkpoint till the
> point where the failure occurred. Unless the processing done by the
> operator is idempotent, this would lead to incorrect results. For example,
> if the operation was "UPDATE sometable SET count = count + 1", we would
> double count.
>
> One technique to deal with this is to maintain the windowId as part of the
> state that gets committed to the external system. Now we can skip the
> processing if we find that the window was already processed. Of course,
> this requires that the upstream operators also deliver the tuples in an
> idempotent manner on a window replay.
>
> Thomas
>
> On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <ch...@datatorrent.com>
> wrote:
>
> > Atri,
> >
> >   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> > transaction. We do not hold the first event after BEGIN_WINDOW hostage
> > until the END_WINDOW is received. This allows us to provide almost zero
> > latency at per tuple level. This is one of the the differentiating
> > paradigms for Apex.
> >
> >   If we do it otherwise - the platform degrades to micro-batch processing
> > mode. More details about it here:
> >
> >
> >
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
> >
> >
> >  Let me know if this answers your question or I misunderstood the
> question.
> >
> > --
> > Chetan
> >
> >
> >
> > On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org> wrote:
> >
> > > Team,
> > >
> > > Does it make sense to have functionality to have all or nothing
> > > transactional system for windows? With future functionality to have
> > dynamic
> > > operators I feel it makes sense to allow data from an entire window to
> be
> > > processed or none of the data to be sent.
> > >
> > > I am not sure if window batching in its current form is a logical
> > > implementation of this feature.
> > >
> > > Thoughts?
> > >
> > > Regards,
> > >
> > > Atri
> > >
> >
>

Re: Window Transactions

Posted by Thomas Weise <th...@datatorrent.com>.
Atri,

A concept of "transactional window" is needed for some applications that
interact with external systems. A number of Malhar operators support it
today. For example, a JDBC operator might perform all operations within a
transaction that commences with the first write in a window and endWindow
will commit the transaction. The engine provides the callbacks, the
operator implements the transaction based on the capabilities of the
external system. Note that this does not imply batching, it merely speaks
to transaction demarcation.

But this is just part of the work needed to make the operator
"transactional". Windows can be reprocessed based on the processing
semantics. When a container goes down, the operator will reset to the
recovery checkpoint and reprocess the windows from the checkpoint till the
point where the failure occurred. Unless the processing done by the
operator is idempotent, this would lead to incorrect results. For example,
if the operation was "UPDATE sometable SET count = count + 1", we would
double count.

One technique to deal with this is to maintain the windowId as part of the
state that gets committed to the external system. Now we can skip the
processing if we find that the window was already processed. Of course,
this requires that the upstream operators also deliver the tuples in an
idempotent manner on a window replay.

Thomas

On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> Atri,
>
>   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> transaction. We do not hold the first event after BEGIN_WINDOW hostage
> until the END_WINDOW is received. This allows us to provide almost zero
> latency at per tuple level. This is one of the the differentiating
> paradigms for Apex.
>
>   If we do it otherwise - the platform degrades to micro-batch processing
> mode. More details about it here:
>
>
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
>
>
>  Let me know if this answers your question or I misunderstood the question.
>
> --
> Chetan
>
>
>
> On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org> wrote:
>
> > Team,
> >
> > Does it make sense to have functionality to have all or nothing
> > transactional system for windows? With future functionality to have
> dynamic
> > operators I feel it makes sense to allow data from an entire window to be
> > processed or none of the data to be sent.
> >
> > I am not sure if window batching in its current form is a logical
> > implementation of this feature.
> >
> > Thoughts?
> >
> > Regards,
> >
> > Atri
> >
>

Re: Window Transactions

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Atri,

What is the use case you have in mind? If you are thinking about rolling
back a transaction if some operations fail during processing that can be
achieved within the current paradigm.

Thanks

On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <ch...@datatorrent.com>
wrote:

> Atri,
>
>   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> transaction. We do not hold the first event after BEGIN_WINDOW hostage
> until the END_WINDOW is received. This allows us to provide almost zero
> latency at per tuple level. This is one of the the differentiating
> paradigms for Apex.
>
>   If we do it otherwise - the platform degrades to micro-batch processing
> mode. More details about it here:
>
>
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
>
>
>  Let me know if this answers your question or I misunderstood the question.
>
> --
> Chetan
>
>
>
> On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org> wrote:
>
> > Team,
> >
> > Does it make sense to have functionality to have all or nothing
> > transactional system for windows? With future functionality to have
> dynamic
> > operators I feel it makes sense to allow data from an entire window to be
> > processed or none of the data to be sent.
> >
> > I am not sure if window batching in its current form is a logical
> > implementation of this feature.
> >
> > Thoughts?
> >
> > Regards,
> >
> > Atri
> >
>

Re: Window Transactions

Posted by Chetan Narsude <ch...@datatorrent.com>.
Atri,

  BEGIN_WINDOW, and END_WINDOW control events demarcate the the
transaction. We do not hold the first event after BEGIN_WINDOW hostage
until the END_WINDOW is received. This allows us to provide almost zero
latency at per tuple level. This is one of the the differentiating
paradigms for Apex.

  If we do it otherwise - the platform degrades to micro-batch processing
mode. More details about it here:

https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/


 Let me know if this answers your question or I misunderstood the question.

--
Chetan



On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <at...@apache.org> wrote:

> Team,
>
> Does it make sense to have functionality to have all or nothing
> transactional system for windows? With future functionality to have dynamic
> operators I feel it makes sense to allow data from an entire window to be
> processed or none of the data to be sent.
>
> I am not sure if window batching in its current form is a logical
> implementation of this feature.
>
> Thoughts?
>
> Regards,
>
> Atri
>