You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Sandesh Hegde <sa...@datatorrent.com> on 2016/12/01 00:12:41 UTC

Re: [DISCUSSION] Custom Control Tuples

I am interested in working on the following subtask

https://issues.apache.org/jira/browse/APEXCORE-581

Thanks


On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com> wrote:

> I have created an umbrella ticket for control tuple support:
>
> https://issues.apache.org/jira/browse/APEXCORE-579
>
> Currently it has two subtasks. Please have a look at them and see whether
> I'm missing anything or if you have anything to add. You are welcome to add
> more subtasks or comment on the existing subtasks.
>
> We would like to kick start the implementation soon.
>
> Thanks!
>
> David
>
> On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > +1 for the plan.
> >
> > I would be interested in contributing to this feature.
> >
> > ~ Bhupesh
> >
> > On Nov 29, 2016 03:26, "Sandesh Hegde" <sa...@datatorrent.com> wrote:
> >
> > > I am interested in contributing to this feature.
> > >
> > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > I think we should probably go ahead with option 1 since this works
> with
> > > > most use cases and prevents developers from shooting themselves in
> the
> > > foot
> > > > in terms of idempotency.
> > > >
> > > > We can have a configuration property that enables option 2 later if
> we
> > > have
> > > > concrete use cases that call for it.
> > > >
> > > > Please share your thoughts if you think you don't agree with this
> plan.
> > > > Also, please indicate if you're interested in contributing to this
> > > feature.
> > > >
> > > > David
> > > >
> > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> > bhupesh@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > It appears that option 1 is more favored due to unavailability of a
> > use
> > > > > case which could use option 2.
> > > > >
> > > > > However, option 2 is problematic in specific cases, like presence
> of
> > > > > multiple input ports for example. In case of a linear DAG where
> > control
> > > > > tuples are flowing in order with the data tuples, it should not be
> > > > > difficult to guarantee idempotency. For example, cases where there
> > > could
> > > > be
> > > > > multiple changes in behavior of an operator during a single window,
> > it
> > > > > should not wait until end window for these changes to take effect.
> > > Since,
> > > > > we don't have a concrete use case right now, perhaps we do not want
> > to
> > > go
> > > > > that road. This feature should be available through a platform
> > > attribute
> > > > > (may be at a later point in time) where the default is option 1.
> > > > >
> > > > > I think option 1 is suitable for a starting point in the
> > implementation
> > > > of
> > > > > this feature and we should proceed with it.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > >
> > > > > > Good question Tushar. The callback should be called only once.
> > > > > > The way to implement this is to keep a list of control tuple
> hashes
> > > for
> > > > > the
> > > > > > given streaming window and only do the callback when the operator
> > has
> > > > not
> > > > > > seen it before.
> > > > > >
> > > > > > Other thoughts?
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> > > tushar@datatorrent.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi David,
> > > > > > >
> > > > > > > What would be the behaviour in case where we have a DAG with
> > > > following
> > > > > > > operators, the number in bracket is number of partitions, X is
> > NxM
> > > > > > > partitioning.
> > > > > > > A(1) X B(4) X C(2)
> > > > > > >
> > > > > > > If A sends a control tuple, it will be sent to all 4 partition
> of
> > > B,
> > > > > > > and from each partition from B it goes to C, i.e each partition
> > of
> > > C
> > > > > > > will receive same control tuple originated from A multiple
> times
> > > > > > > (number of upstream partitions of C). In this case will the
> > > callback
> > > > > > > function get called multiple times or just once.
> > > > > > >
> > > > > > > -Tushar.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > > > Hi Bhupesh,
> > > > > > > >
> > > > > > > > Since each input port has its own incoming control tuple, I
> > would
> > > > > > imagine
> > > > > > > > there would be an additional DefaultInputPort.processControl
> > > method
> > > > > > that
> > > > > > > > operator developers can override.
> > > > > > > > If we go for option 1, my thinking is that the control tuples
> > > would
> > > > > > > always
> > > > > > > > be delivered at the next window boundary, even if the emit
> > method
> > > > is
> > > > > > > called
> > > > > > > > within a window.
> > > > > > > >
> > > > > > > > David
> > > > > > > >
> > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > > > > > bhupesh@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> I have a question regarding the callback for a control
> tuple.
> > > Will
> > > > > it
> > > > > > be
> > > > > > > >> similar to InputPort::process() method? Something like
> > > > > > > >> InputPort::processControlTuple(t)
> > > > > > > >> ? Or will it be a method of the operator similar to
> > > beginWindow()?
> > > > > > > >>
> > > > > > > >> When we say that the control tuple will be delivered at
> window
> > > > > > boundary,
> > > > > > > >> does that mean all control tuples emitted in that window
> will
> > be
> > > > > > > processed
> > > > > > > >> together at the end of the window? This would imply that
> there
> > > is
> > > > no
> > > > > > > >> ordering among regular tuples and control tuples.
> > > > > > > >>
> > > > > > > >> I think we should get started with the option 1 - control
> > tuples
> > > > at
> > > > > > > window
> > > > > > > >> boundary, which seems to handle most of the use cases. For
> > some
> > > > > cases
> > > > > > > which
> > > > > > > >> require option 2, we can always build on this.
> > > > > > > >>
> > > > > > > >> ~ Bhupesh
> > > > > > > >>
> > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
> thw@apache.org>
> > > > > wrote:
> > > > > > > >>
> > > > > > > >> > I don't see how that would work. Suppose you have a file
> > > > splitter
> > > > > > and
> > > > > > > >> > multiple partitions of block readers. The "end of file"
> > event
> > > > > cannot
> > > > > > > be
> > > > > > > >> > processed downstream until all block readers are done. I
> > also
> > > > > think
> > > > > > > that
> > > > > > > >> > this is related to the batch demarcation discussion and
> > there
> > > > > should
> > > > > > > be a
> > > > > > > >> > single generalized mechanism to support this.
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > > > > > > pramod@datatorrent.com
> > > > > > > >> >
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Suppose I am processing data in a file and I want to do
> > > > > something
> > > > > > at
> > > > > > > >> the
> > > > > > > >> > > end of a file at the output operator, I would send an
> end
> > > file
> > > > > > > control
> > > > > > > >> > > tuple and act on it when I receive it at the output. In
> a
> > > > single
> > > > > > > >> window I
> > > > > > > >> > > may end up processing multiple files and if I don't have
> > > > > multiple
> > > > > > > ports
> > > > > > > >> > and
> > > > > > > >> > > logical paths through the DAG (multiple partitions are
> > ok).
> > > I
> > > > > can
> > > > > > > >> process
> > > > > > > >> > > end of each file immediately and also know what file was
> > > > closed
> > > > > > > without
> > > > > > > >> > > sending extra identification information in the end file
> > > > which I
> > > > > > > would
> > > > > > > >> > need
> > > > > > > >> > > if I am collecting all of them and processing at the end
> > of
> > > > the
> > > > > > > window.
> > > > > > > >> > >
> > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> > > thw@apache.org>
> > > > > > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > The use cases listed in the original discussion don't
> > call
> > > > for
> > > > > > > option
> > > > > > > >> > 2.
> > > > > > > >> > > It
> > > > > > > >> > > > seems to come with additional complexity and
> > > implementation
> > > > > > cost.
> > > > > > > >> > > >
> > > > > > > >> > > > Can those in favor of option 2 please also provide the
> > use
> > > > > case
> > > > > > > for
> > > > > > > >> it.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks,
> > > > > > > >> > > > Thomas
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > > > > > > siyuan@datatorrent.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > I will vote for approach 1.
> > > > > > > >> > > > >
> > > > > > > >> > > > > First of all that one sounds easier to do to me.
> And I
> > > > think
> > > > > > > >> > > idempotency
> > > > > > > >> > > > is
> > > > > > > >> > > > > important. It may run at the cost of higher latency
> > but
> > > I
> > > > > > think
> > > > > > > it
> > > > > > > >> is
> > > > > > > >> > > ok
> > > > > > > >> > > > >
> > > > > > > >> > > > > And in addition, when in the future if users do need
> > > > > realtime
> > > > > > > >> control
> > > > > > > >> > > > tuple
> > > > > > > >> > > > > processing, we can always add the option on top of
> it.
> > > > > > > >> > > > >
> > > > > > > >> > > > > So I vote for 1
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks,
> > > > > > > >> > > > > Siyuan
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> > > > > > > prad@apache.org>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > As a rule of thumb in any real time operating
> > system,
> > > > > > control
> > > > > > > >> > tuples
> > > > > > > >> > > > > should
> > > > > > > >> > > > > > always be handled using Priority Queues.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > We may try to control priorities by defining
> levels.
> > > And
> > > > > > shall
> > > > > > > >> not
> > > > > > > >> > > > > > be delivered at window boundaries.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > In short, control tuples shall never be treated as
> > any
> > > > > other
> > > > > > > >> tuples
> > > > > > > >> > > in
> > > > > > > >> > > > > real
> > > > > > > >> > > > > > time systems.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > > > > > > david@datatorrent.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > > Hi all,
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > I would like to renew the discussion of control
> > > > tuples.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Last time, we were in a debate about whether:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > 1) the platform should enforce that control
> tuples
> > > are
> > > > > > > >> delivered
> > > > > > > >> > at
> > > > > > > >> > > > > > window
> > > > > > > >> > > > > > > boundaries only
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > or:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > 2) the platform should deliver control tuples
> just
> > > as
> > > > > > other
> > > > > > > >> > tuples
> > > > > > > >> > > > and
> > > > > > > >> > > > > > it's
> > > > > > > >> > > > > > > the operator developers' choice whether to
> handle
> > > the
> > > > > > > control
> > > > > > > >> > > tuples
> > > > > > > >> > > > as
> > > > > > > >> > > > > > > they arrive or delay the processing till the
> next
> > > > window
> > > > > > > >> > boundary.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > To summarize the pros and cons:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Approach 1: If processing control tuples results
> > in
> > > > > > changes
> > > > > > > of
> > > > > > > >> > the
> > > > > > > >> > > > > > behavior
> > > > > > > >> > > > > > > of the operator, if idempotency needs to be
> > > preserved,
> > > > > the
> > > > > > > >> > > processing
> > > > > > > >> > > > > > must
> > > > > > > >> > > > > > > be done at window boundaries. This approach will
> > > save
> > > > > the
> > > > > > > >> > operator
> > > > > > > >> > > > > > > developers headache to ensure that. However,
> this
> > > will
> > > > > > take
> > > > > > > >> away
> > > > > > > >> > > the
> > > > > > > >> > > > > > > choices from the operator developer if they just
> > > need
> > > > to
> > > > > > > >> process
> > > > > > > >> > > the
> > > > > > > >> > > > > > > control tuples as soon as possible.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Approach 2: The operator has a chance to
> > immediately
> > > > > > process
> > > > > > > >> > > control
> > > > > > > >> > > > > > > tuples. This would be useful if latency is more
> > > valued
> > > > > > than
> > > > > > > >> > > > > correctness.
> > > > > > > >> > > > > > > However, if this would open the possibility for
> > > > operator
> > > > > > > >> > developers
> > > > > > > >> > > > to
> > > > > > > >> > > > > > > shoot themselves in the foot. This is especially
> > > true
> > > > if
> > > > > > > there
> > > > > > > >> > are
> > > > > > > >> > > > > > multiple
> > > > > > > >> > > > > > > input ports. as there is no easy way to
> guarantee
> > > > > > processing
> > > > > > > >> > order
> > > > > > > >> > > > for
> > > > > > > >> > > > > > > multiple input ports.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > We would like to arrive to a consensus and close
> > > this
> > > > > > > >> discussion
> > > > > > > >> > > soon
> > > > > > > >> > > > > > this
> > > > > > > >> > > > > > > time so we can start the work on this important
> > > > feature.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Thanks!
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > David
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > > > > > >> > > > v.rozov@datatorrent.com
> > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > >> > > > > > > wrote:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > It is not clear how operator will emit custom
> > > > control
> > > > > > > tuple
> > > > > > > >> at
> > > > > > > >> > > > window
> > > > > > > >> > > > > > > > boundaries. One way is to cache/accumulate
> > control
> > > > > > tuples
> > > > > > > in
> > > > > > > >> > the
> > > > > > > >> > > > > > operator
> > > > > > > >> > > > > > > > output port till window closes (END_WINDOW is
> > > > inserted
> > > > > > > into
> > > > > > > >> the
> > > > > > > >> > > > > output
> > > > > > > >> > > > > > > > sink) or only allow an operator to emit
> control
> > > > tuples
> > > > > > > inside
> > > > > > > >> > the
> > > > > > > >> > > > > > > > endWindow(). The later is a slight variation
> of
> > > the
> > > > > > > operator
> > > > > > > >> > > output
> > > > > > > >> > > > > > port
> > > > > > > >> > > > > > > > caching behavior with the only difference that
> > now
> > > > the
> > > > > > > >> operator
> > > > > > > >> > > > > itself
> > > > > > > >> > > > > > is
> > > > > > > >> > > > > > > > responsible for caching/accumulating control
> > > tuples.
> > > > > > Note
> > > > > > > >> that
> > > > > > > >> > in
> > > > > > > >> > > > > many
> > > > > > > >> > > > > > > > cases it will be necessary to postpone
> emitting
> > > > > payload
> > > > > > > >> tuples
> > > > > > > >> > > that
> > > > > > > >> > > > > > > > logically come after the custom control tuple
> > till
> > > > the
> > > > > > > next
> > > > > > > >> > > window
> > > > > > > >> > > > > > > begins.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > IMO, that too restrictive and in a case where
> > > input
> > > > > > > operator
> > > > > > > >> > > uses a
> > > > > > > >> > > > > > push
> > > > > > > >> > > > > > > > instead of a poll (for example, it provides an
> > end
> > > > > point
> > > > > > > >> where
> > > > > > > >> > > > remote
> > > > > > > >> > > > > > > > agents may connect and publish/push data),
> > control
> > > > > > tuples
> > > > > > > may
> > > > > > > >> > be
> > > > > > > >> > > > used
> > > > > > > >> > > > > > for
> > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
> > > > > (partitioned)
> > > > > > > >> > > downstream
> > > > > > > >> > > > > > > > operators. In this case the platform just need
> > to
> > > > > > > guarantee
> > > > > > > >> > order
> > > > > > > >> > > > > > barrier
> > > > > > > >> > > > > > > > (any tuple emitted prior to a control tuple
> > needs
> > > to
> > > > > be
> > > > > > > >> > delivered
> > > > > > > >> > > > > prior
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > > the control tuple).
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Thank you,
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Vlad
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >> I agree with David. Allowing control tuples
> > > within
> > > > a
> > > > > > > window
> > > > > > > >> > > (along
> > > > > > > >> > > > > > with
> > > > > > > >> > > > > > > >> data tuples) creates very dangerous situation
> > > where
> > > > > > > >> guarantees
> > > > > > > >> > > are
> > > > > > > >> > > > > > > >> impacted. It is much safer to enable control
> > > tuples
> > > > > > > >> > > (send/receive)
> > > > > > > >> > > > > at
> > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of window
> > N,
> > > > and
> > > > > > > before
> > > > > > > >> > > > > > BEGIN_WINDOW
> > > > > > > >> > > > > > > >> for window N+1). My take on David's list is
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there
> > will
> > > > be a
> > > > > > big
> > > > > > > >> > issue
> > > > > > > >> > > > with
> > > > > > > >> > > > > > > >> guarantees for operators with multiple ports.
> > > (see
> > > > > > > Thomas's
> > > > > > > >> > > > > response)
> > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but there
> > are
> > > > > > > >> situations;
> > > > > > > >> > a
> > > > > > > >> > > > > caveat
> > > > > > > >> > > > > > > >> could be "only to operators that implement
> > > control
> > > > > > tuple
> > > > > > > >> > > > > > > >> interface/listeners", which could effectively
> > > > > > translates
> > > > > > > to
> > > > > > > >> > "all
> > > > > > > >> > > > > > > >> interested
> > > > > > > >> > > > > > > >> downstream operators"
> > > > > > > >> > > > > > > >> 3. Only Input operator can create control
> > tuples
> > > ->
> > > > > -1;
> > > > > > > is
> > > > > > > >> > > > > restrictive
> > > > > > > >> > > > > > > >> even
> > > > > > > >> > > > > > > >> though most likely 95% of the time it will be
> > > input
> > > > > > > >> operators
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> Thks,
> > > > > > > >> > > > > > > >> Amol
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas
> Weise <
> > > > > > > >> > > > > thomas@datatorrent.com
> > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > >> > > > > > > >> wrote:
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> The windowing we discuss here is in general
> > event
> > > > > time
> > > > > > > >> based,
> > > > > > > >> > > > > arrival
> > > > > > > >> > > > > > > time
> > > > > > > >> > > > > > > >>> is a special case of it.
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>> I don't think state changes can be made
> > > > independent
> > > > > of
> > > > > > > the
> > > > > > > >> > > > > streaming
> > > > > > > >> > > > > > > >>> window
> > > > > > > >> > > > > > > >>> boundary as it would prevent idempotent
> > > processing
> > > > > and
> > > > > > > >> > > > transitively
> > > > > > > >> > > > > > > >>> exactly
> > > > > > > >> > > > > > > >>> once. For that to work, tuples need to be
> > > > presented
> > > > > to
> > > > > > > the
> > > > > > > >> > > > operator
> > > > > > > >> > > > > > in
> > > > > > > >> > > > > > > a
> > > > > > > >> > > > > > > >>> guaranteed order *within* the streaming
> > window,
> > > > > which
> > > > > > is
> > > > > > > >> not
> > > > > > > >> > > > > possible
> > > > > > > >> > > > > > > >>> with
> > > > > > > >> > > > > > > >>> multiple ports (and partitions).
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>> Thomas
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > > > > > > >> > > > david@datatorrent.com
> > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > >> > > > > > > >>> wrote:
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>> I think for session tracking, if the session
> > > > > > boundaries
> > > > > > > are
> > > > > > > >> > > > allowed
> > > > > > > >> > > > > > to
> > > > > > > >> > > > > > > be
> > > > > > > >> > > > > > > >>>> not aligned with the streaming window
> > > boundaries,
> > > > > the
> > > > > > > user
> > > > > > > >> > > will
> > > > > > > >> > > > > > have a
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>> much
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And in
> most
> > > > cases,
> > > > > > > >> session
> > > > > > > >> > > > > tracking
> > > > > > > >> > > > > > > is
> > > > > > > >> > > > > > > >>>> event time based, not ingression time or
> > > > processing
> > > > > > > time
> > > > > > > >> > > based,
> > > > > > > >> > > > so
> > > > > > > >> > > > > > > this
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>> may
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
> happens,
> > > the
> > > > > > user
> > > > > > > can
> > > > > > > >> > > > always
> > > > > > > >> > > > > > > alter
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>> the
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> default 500ms width.
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>> David
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad
> Rozov <
> > > > > > > >> > > > > > v.rozov@datatorrent.com
> > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > >> > > > > > > >>>> wrote:
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>> Ability to send custom control tuples
> within
> > > > window
> > > > > > > may be
> > > > > > > >> > > > useful,
> > > > > > > >> > > > > > for
> > > > > > > >> > > > > > > >>>>> example, for sessions tracking, where
> > session
> > > > > > > boundaries
> > > > > > > >> > are
> > > > > > > >> > > > not
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>> aligned
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency
> is
> > > not
> > > > > > > >> acceptable
> > > > > > > >> > > for
> > > > > > > >> > > > an
> > > > > > > >> > > > > > > >>>>> application.
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>>> Thank you,
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>>> Vlad
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > > > >> > > > > > > >>>>>
> > > > > > > >> > > > > > > >>>>> It should not matter from where the
> control
> > > > tuple
> > > > > is
> > > > > > > >> > > triggered.
> > > > > > > >> > > > > It
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> will
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> be
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> > propagate
> > > it
> > > > > and
> > > > > > > >> other
> > > > > > > >> > > > things
> > > > > > > >> > > > > > can
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> be
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
> example,
> > > the
> > > > > new
> > > > > > > >> > > > > comprehensive
> > > > > > > >> > > > > > > >>>>>> support
> > > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar,
> > nothing
> > > > that
> > > > > > the
> > > > > > > >> > engine
> > > > > > > >> > > > > needs
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> know
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> about it except that we need the control
> > tuple
> > > > for
> > > > > > > >> > watermark
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> propagation
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> and idempotent processing.
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>> I also think the main difference to other
> > > > tuples
> > > > > is
> > > > > > > the
> > > > > > > >> > need
> > > > > > > >> > > > to
> > > > > > > >> > > > > > send
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> it
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> to
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> > checkpoint
> > > > > > window
> > > > > > > >> > tuples,
> > > > > > > >> > > > but
> > > > > > > >> > > > > > not
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> the
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> same. Here, we probably also need the
> > ability
> > > > for
> > > > > > the
> > > > > > > >> user
> > > > > > > >> > to
> > > > > > > >> > > > > > control
> > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse the
> > entire
> > > > DAG
> > > > > > or
> > > > > > > >> not.
> > > > > > > >> > > For
> > > > > > > >> > > > a
> > > > > > > >> > > > > > > batch
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> use
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> case, for example, we may want to send the
> > end
> > > > of
> > > > > > > file to
> > > > > > > >> > the
> > > > > > > >> > > > > next
> > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the operator
> > has
> > > > > > > >> asynchronous
> > > > > > > >> > > > > > > processing
> > > > > > > >> > > > > > > >>>>>> logic
> > > > > > > >> > > > > > > >>>>>> in it.
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the
> control
> > > > tuple
> > > > > > > needs
> > > > > > > >> to
> > > > > > > >> > > be
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> processed
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> at
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the control
> > tuple
> > > > in
> > > > > > the
> > > > > > > >> > window
> > > > > > > >> > > > > > callback
> > > > > > > >> > > > > > > >>>>>> would
> > > > > > > >> > > > > > > >>>>>> avoid having to track extra state in the
> > > > > operator.
> > > > > > I
> > > > > > > >> don't
> > > > > > > >> > > > think
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> that's
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> a
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> major issue, but what is the use case for
> > > > > > processing a
> > > > > > > >> > > control
> > > > > > > >> > > > > > tuple
> > > > > > > >> > > > > > > >>>>>> within
> > > > > > > >> > > > > > > >>>>>> the window?
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>> Thomas
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod
> > > > Immaneni
> > > > > <
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>> pramod@datatorrent.com <javascript:;>>
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> wrote:
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I think
> 1)
> > > and
> > > > > 2)
> > > > > > > are
> > > > > > > >> > more
> > > > > > > >> > > > > likely
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > >>>>>>
> > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
> application,
> > > 3)
> > > > > and
> > > > > > 4)
> > > > > > > >> are
> > > > > > > >> > > more
> > > > > > > >> > > > > > > likely
> > > > > > > >> > > > > > > >>>>>>> going to be triggered externally and
> > > directly
> > > > > > > handled
> > > > > > > >> by
> > > > > > > >> > > the
> > > > > > > >> > > > > > engine
> > > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented that
> > way
> > > > > > > >> > > (apexcore-163).
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
> operator
> > > > would
> > > > > be
> > > > > > > sent
> > > > > > > >> > to
> > > > > > > >> > > > all
> > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and that
> > > would
> > > > be
> > > > > > the
> > > > > > > >> > chief
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>> distinction
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> compared to data (apart from the payload)
> > which
> > > > > would
> > > > > > > get
> > > > > > > >> > > > > > partitioned
> > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It would
> also
> > be
> > > > > > > guaranteed
> > > > > > > >> > > that
> > > > > > > >> > > > > > > >>>>>>> downstream partitions will receive
> control
> > > > > tuples
> > > > > > > only
> > > > > > > >> > > after
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > data
> > > > > > > >> > > > > > > >>>>>>> that was sent before it so we could send
> > it
> > > > > > > immediately
> > > > > > > >> > > when
> > > > > > > >> > > > it
> > > > > > > >> > > > > > is
> > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> However during unification it is
> important
> > > to
> > > > > know
> > > > > > > if
> > > > > > > >> > these
> > > > > > > >> > > > > > control
> > > > > > > >> > > > > > > >>>>>>> tuples have been received from all
> > upstream
> > > > > > > partitions
> > > > > > > >> > > before
> > > > > > > >> > > > > > > >>>>>>> proceeding with a control operation. One
> > > could
> > > > > > wait
> > > > > > > >> till
> > > > > > > >> > > end
> > > > > > > >> > > > of
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > >>>>>>> window but that introduces a delay
> however
> > > > > small,
> > > > > > I
> > > > > > > >> would
> > > > > > > >> > > > like
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > > add
> > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform only
> > hand
> > > > over
> > > > > > the
> > > > > > > >> > > control
> > > > > > > >> > > > > > tuple
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > >>>>>>> the unifier when it has been received
> from
> > > all
> > > > > > > upstream
> > > > > > > >> > > > > > partitions
> > > > > > > >> > > > > > > >>>>>>> much like how end window is processed
> but
> > > not
> > > > > wait
> > > > > > > till
> > > > > > > >> > the
> > > > > > > >> > > > > > actual
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>> end
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> of the window.
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency, we
> > > > > typically
> > > > > > > care
> > > > > > > >> > > about
> > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and doing
> > the
> > > > > above
> > > > > > > will
> > > > > > > >> > > still
> > > > > > > >> > > > > > allow
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>> the
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> operators to preserve that easily.
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> Thanks
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan
> <
> > > > > > > >> > > > david@datatorrent.com
> > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>> wrote:
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> Hi all,
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature
> to
> > > the
> > > > > Apex
> > > > > > > core
> > > > > > > >> > > > engine
> > > > > > > >> > > > > --
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
> > > Currently,
> > > > we
> > > > > > > have
> > > > > > > >> > > control
> > > > > > > >> > > > > > > tuples
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> such
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> as
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT,
> and
> > so
> > > > on,
> > > > > > > but we
> > > > > > > >> > > don't
> > > > > > > >> > > > > > have
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> the
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> support for applications to insert their
> own
> > > > > control
> > > > > > > >> tuples.
> > > > > > > >> > > The
> > > > > > > >> > > > > way
> > > > > > > >> > > > > > > >>>>>>>> currently to get around this is to use
> > data
> > > > > > tuples
> > > > > > > and
> > > > > > > >> > > have
> > > > > > > >> > > > a
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> separate
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> port
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to all
> > > > > > partitions
> > > > > > > of
> > > > > > > >> > the
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> downstream
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> operators, which is not exactly developer
> > > > friendly.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of use
> > cases
> > > > that
> > > > > > can
> > > > > > > >> use
> > > > > > > >> > > this
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> feature:
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all
> > operators
> > > > of
> > > > > > the
> > > > > > > >> > > physical
> > > > > > > >> > > > > DAG
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> when
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> a
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the operators
> > can
> > > do
> > > > > > > whatever
> > > > > > > >> > > that
> > > > > > > >> > > > is
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> needed
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> upon
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts
> of
> > > > event
> > > > > > time
> > > > > > > >> > > > windowing,
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to
> tell
> > > > which
> > > > > > > >> windows
> > > > > > > >> > > > should
> > > > > > > >> > > > > > be
> > > > > > > >> > > > > > > >>>>>>>> considered late.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We do
> > have
> > > > the
> > > > > > > >> support
> > > > > > > >> > of
> > > > > > > >> > > > > > > changing
> > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but
> with
> > a
> > > > > custom
> > > > > > > >> > control
> > > > > > > >> > > > > tuple,
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > >>>>>>>> command to change operator properties
> can
> > > be
> > > > > > window
> > > > > > > >> > > aligned
> > > > > > > >> > > > > for
> > > > > > > >> > > > > > > all
> > > > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing
> > operator
> > > > > > > >> properties,
> > > > > > > >> > we
> > > > > > > >> > > > do
> > > > > > > >> > > > > > have
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> this
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> support now but only at the individual
> > > physical
> > > > > > > operator
> > > > > > > >> > > level,
> > > > > > > >> > > > > and
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> without
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> control of which window to record tuples
> > > for.
> > > > > > With a
> > > > > > > >> > custom
> > > > > > > >> > > > > > control
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> tuple,
> > > > > > > >> > > > > > > >>>>>>>
> > > > > > > >> > > > > > > >>>>>>> because a control tuple must belong to a
> > > > window,
> > > > > > all
> > > > > > > >> > > > operators
> > > > > > > >> > > > > in
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> the
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> DAG
> > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for the
> > same
> > > > > > > windows.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve
> > this:
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that
> > takes
> > > > > > user's
> > > > > > > >> > > > > serializable
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> object.
> > > > > > > >> > > > > > > >>>>
> > > > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and
> > > > > > END_WINDOW
> > > > > > > >> > control
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>> tuples.
> > > > > > > >> > > > > > > >>>
> > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank you.
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>> David
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSSION] Custom Control Tuples

Posted by Pramod Immaneni <pr...@datatorrent.com>.
There have been some recent developments and discussions on the schema side
(link below) that warrant a reconsideration of how control tuples get
delivered.

http://apache.markmail.org/search/?q=apex+list%3Aorg.apache.apex.dev+schema+
discovery+support#query:apex%20list%3Aorg.apache.apex.dev%
20schema%20discovery%20support+page:1+mid:oaji26y3xfozap5v+state:results

What I would like to suggest is that we allow two delivery options for
control tuples which can be configured on a per control tuple basis. First
is to deliver control tuple to the operator when the first instance of the
tuple arrives from any path. Second option is to deliver the control tuple when
the last instance of the tuple arrives from all the paths or at the end
window if it is going to be difficult to determine the last arrival. The
developer can choose the delivery option for the control tuple preferably
when the tuple is created. The first option will be useful for scenarios
like schema propagation or begin file in case of batch cases. The second
option will be useful for tuples like end file or end batch in batch use
cases.

Thanks

On Sat, Dec 3, 2016 at 1:12 AM, David Yan <da...@datatorrent.com> wrote:

> Bhupesh, Sandesh, Tushar:
>
> Thanks for volunteering. This task probably needs all three of you to work
> closely together.
>
> The subtasks so far are:
>
> https://issues.apache.org/jira/browse/APEXCORE-580
> https://issues.apache.org/jira/browse/APEXCORE-581
>
> Please first review the subtasks and see whether anything is missing and
> add your thoughts to the tickets if you have any preliminary idea how to
> implement them.
>
> By the way, I think APEXCORE-581 is more involving and it might be a good
> idea to split that up further. It also makes sense since there are three of
> you.
>
> David
>
>
> On Thu, Dec 1, 2016 at 3:41 AM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > I am also interested working on this feature.
> >
> > - Tushar.
> >
> >
> > On Thu, Dec 1, 2016 at 10:27 AM, Bhupesh Chawda <bhupesh@datatorrent.com
> >
> > wrote:
> > > I would like to work on https://issues.apache.org/
> > jira/browse/APEXCORE-580.
> > >
> > > ~ Bhupesh
> > >
> > > On Thu, Dec 1, 2016 at 5:42 AM, Sandesh Hegde <sandesh@datatorrent.com
> >
> > > wrote:
> > >
> > >> I am interested in working on the following subtask
> > >>
> > >> https://issues.apache.org/jira/browse/APEXCORE-581
> > >>
> > >> Thanks
> > >>
> > >>
> > >> On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > I have created an umbrella ticket for control tuple support:
> > >> >
> > >> > https://issues.apache.org/jira/browse/APEXCORE-579
> > >> >
> > >> > Currently it has two subtasks. Please have a look at them and see
> > whether
> > >> > I'm missing anything or if you have anything to add. You are welcome
> > to
> > >> add
> > >> > more subtasks or comment on the existing subtasks.
> > >> >
> > >> > We would like to kick start the implementation soon.
> > >> >
> > >> > Thanks!
> > >> >
> > >> > David
> > >> >
> > >> > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <
> > bhupesh@datatorrent.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > +1 for the plan.
> > >> > >
> > >> > > I would be interested in contributing to this feature.
> > >> > >
> > >> > > ~ Bhupesh
> > >> > >
> > >> > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sa...@datatorrent.com>
> > >> wrote:
> > >> > >
> > >> > > > I am interested in contributing to this feature.
> > >> > > >
> > >> > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <
> david@datatorrent.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > I think we should probably go ahead with option 1 since this
> > works
> > >> > with
> > >> > > > > most use cases and prevents developers from shooting
> themselves
> > in
> > >> > the
> > >> > > > foot
> > >> > > > > in terms of idempotency.
> > >> > > > >
> > >> > > > > We can have a configuration property that enables option 2
> > later if
> > >> > we
> > >> > > > have
> > >> > > > > concrete use cases that call for it.
> > >> > > > >
> > >> > > > > Please share your thoughts if you think you don't agree with
> > this
> > >> > plan.
> > >> > > > > Also, please indicate if you're interested in contributing to
> > this
> > >> > > > feature.
> > >> > > > >
> > >> > > > > David
> > >> > > > >
> > >> > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> > >> > > bhupesh@datatorrent.com
> > >> > > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > It appears that option 1 is more favored due to
> unavailability
> > >> of a
> > >> > > use
> > >> > > > > > case which could use option 2.
> > >> > > > > >
> > >> > > > > > However, option 2 is problematic in specific cases, like
> > presence
> > >> > of
> > >> > > > > > multiple input ports for example. In case of a linear DAG
> > where
> > >> > > control
> > >> > > > > > tuples are flowing in order with the data tuples, it should
> > not
> > >> be
> > >> > > > > > difficult to guarantee idempotency. For example, cases where
> > >> there
> > >> > > > could
> > >> > > > > be
> > >> > > > > > multiple changes in behavior of an operator during a single
> > >> window,
> > >> > > it
> > >> > > > > > should not wait until end window for these changes to take
> > >> effect.
> > >> > > > Since,
> > >> > > > > > we don't have a concrete use case right now, perhaps we do
> not
> > >> want
> > >> > > to
> > >> > > > go
> > >> > > > > > that road. This feature should be available through a
> platform
> > >> > > > attribute
> > >> > > > > > (may be at a later point in time) where the default is
> option
> > 1.
> > >> > > > > >
> > >> > > > > > I think option 1 is suitable for a starting point in the
> > >> > > implementation
> > >> > > > > of
> > >> > > > > > this feature and we should proceed with it.
> > >> > > > > >
> > >> > > > > > ~ Bhupesh
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <
> > >> david@datatorrent.com
> > >> > >
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Good question Tushar. The callback should be called only
> > once.
> > >> > > > > > > The way to implement this is to keep a list of control
> tuple
> > >> > hashes
> > >> > > > for
> > >> > > > > > the
> > >> > > > > > > given streaming window and only do the callback when the
> > >> operator
> > >> > > has
> > >> > > > > not
> > >> > > > > > > seen it before.
> > >> > > > > > >
> > >> > > > > > > Other thoughts?
> > >> > > > > > >
> > >> > > > > > > David
> > >> > > > > > >
> > >> > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> > >> > > > tushar@datatorrent.com
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hi David,
> > >> > > > > > > >
> > >> > > > > > > > What would be the behaviour in case where we have a DAG
> > with
> > >> > > > > following
> > >> > > > > > > > operators, the number in bracket is number of
> partitions,
> > X
> > >> is
> > >> > > NxM
> > >> > > > > > > > partitioning.
> > >> > > > > > > > A(1) X B(4) X C(2)
> > >> > > > > > > >
> > >> > > > > > > > If A sends a control tuple, it will be sent to all 4
> > >> partition
> > >> > of
> > >> > > > B,
> > >> > > > > > > > and from each partition from B it goes to C, i.e each
> > >> partition
> > >> > > of
> > >> > > > C
> > >> > > > > > > > will receive same control tuple originated from A
> multiple
> > >> > times
> > >> > > > > > > > (number of upstream partitions of C). In this case will
> > the
> > >> > > > callback
> > >> > > > > > > > function get called multiple times or just once.
> > >> > > > > > > >
> > >> > > > > > > > -Tushar.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> > >> > > david@datatorrent.com>
> > >> > > > > > > wrote:
> > >> > > > > > > > > Hi Bhupesh,
> > >> > > > > > > > >
> > >> > > > > > > > > Since each input port has its own incoming control
> > tuple, I
> > >> > > would
> > >> > > > > > > imagine
> > >> > > > > > > > > there would be an additional DefaultInputPort.
> > >> processControl
> > >> > > > method
> > >> > > > > > > that
> > >> > > > > > > > > operator developers can override.
> > >> > > > > > > > > If we go for option 1, my thinking is that the control
> > >> tuples
> > >> > > > would
> > >> > > > > > > > always
> > >> > > > > > > > > be delivered at the next window boundary, even if the
> > emit
> > >> > > method
> > >> > > > > is
> > >> > > > > > > > called
> > >> > > > > > > > > within a window.
> > >> > > > > > > > >
> > >> > > > > > > > > David
> > >> > > > > > > > >
> > >> > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > >> > > > > > > bhupesh@datatorrent.com>
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > >> I have a question regarding the callback for a
> control
> > >> > tuple.
> > >> > > > Will
> > >> > > > > > it
> > >> > > > > > > be
> > >> > > > > > > > >> similar to InputPort::process() method? Something
> like
> > >> > > > > > > > >> InputPort::processControlTuple(t)
> > >> > > > > > > > >> ? Or will it be a method of the operator similar to
> > >> > > > beginWindow()?
> > >> > > > > > > > >>
> > >> > > > > > > > >> When we say that the control tuple will be delivered
> at
> > >> > window
> > >> > > > > > > boundary,
> > >> > > > > > > > >> does that mean all control tuples emitted in that
> > window
> > >> > will
> > >> > > be
> > >> > > > > > > > processed
> > >> > > > > > > > >> together at the end of the window? This would imply
> > that
> > >> > there
> > >> > > > is
> > >> > > > > no
> > >> > > > > > > > >> ordering among regular tuples and control tuples.
> > >> > > > > > > > >>
> > >> > > > > > > > >> I think we should get started with the option 1 -
> > control
> > >> > > tuples
> > >> > > > > at
> > >> > > > > > > > window
> > >> > > > > > > > >> boundary, which seems to handle most of the use
> cases.
> > For
> > >> > > some
> > >> > > > > > cases
> > >> > > > > > > > which
> > >> > > > > > > > >> require option 2, we can always build on this.
> > >> > > > > > > > >>
> > >> > > > > > > > >> ~ Bhupesh
> > >> > > > > > > > >>
> > >> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
> > >> > thw@apache.org>
> > >> > > > > > wrote:
> > >> > > > > > > > >>
> > >> > > > > > > > >> > I don't see how that would work. Suppose you have a
> > file
> > >> > > > > splitter
> > >> > > > > > > and
> > >> > > > > > > > >> > multiple partitions of block readers. The "end of
> > file"
> > >> > > event
> > >> > > > > > cannot
> > >> > > > > > > > be
> > >> > > > > > > > >> > processed downstream until all block readers are
> > done. I
> > >> > > also
> > >> > > > > > think
> > >> > > > > > > > that
> > >> > > > > > > > >> > this is related to the batch demarcation discussion
> > and
> > >> > > there
> > >> > > > > > should
> > >> > > > > > > > be a
> > >> > > > > > > > >> > single generalized mechanism to support this.
> > >> > > > > > > > >> >
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > >> > > > > > > > pramod@datatorrent.com
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > wrote:
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > > Suppose I am processing data in a file and I want
> > to
> > >> do
> > >> > > > > > something
> > >> > > > > > > at
> > >> > > > > > > > >> the
> > >> > > > > > > > >> > > end of a file at the output operator, I would
> send
> > an
> > >> > end
> > >> > > > file
> > >> > > > > > > > control
> > >> > > > > > > > >> > > tuple and act on it when I receive it at the
> > output.
> > >> In
> > >> > a
> > >> > > > > single
> > >> > > > > > > > >> window I
> > >> > > > > > > > >> > > may end up processing multiple files and if I
> don't
> > >> have
> > >> > > > > > multiple
> > >> > > > > > > > ports
> > >> > > > > > > > >> > and
> > >> > > > > > > > >> > > logical paths through the DAG (multiple
> partitions
> > are
> > >> > > ok).
> > >> > > > I
> > >> > > > > > can
> > >> > > > > > > > >> process
> > >> > > > > > > > >> > > end of each file immediately and also know what
> > file
> > >> was
> > >> > > > > closed
> > >> > > > > > > > without
> > >> > > > > > > > >> > > sending extra identification information in the
> end
> > >> file
> > >> > > > > which I
> > >> > > > > > > > would
> > >> > > > > > > > >> > need
> > >> > > > > > > > >> > > if I am collecting all of them and processing at
> > the
> > >> end
> > >> > > of
> > >> > > > > the
> > >> > > > > > > > window.
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> > >> > > > thw@apache.org>
> > >> > > > > > > > wrote:
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> > > > The use cases listed in the original discussion
> > >> don't
> > >> > > call
> > >> > > > > for
> > >> > > > > > > > option
> > >> > > > > > > > >> > 2.
> > >> > > > > > > > >> > > It
> > >> > > > > > > > >> > > > seems to come with additional complexity and
> > >> > > > implementation
> > >> > > > > > > cost.
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > Can those in favor of option 2 please also
> > provide
> > >> the
> > >> > > use
> > >> > > > > > case
> > >> > > > > > > > for
> > >> > > > > > > > >> it.
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > Thanks,
> > >> > > > > > > > >> > > > Thomas
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > >> > > > > > > > siyuan@datatorrent.com>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > > I will vote for approach 1.
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > First of all that one sounds easier to do to
> > me.
> > >> > And I
> > >> > > > > think
> > >> > > > > > > > >> > > idempotency
> > >> > > > > > > > >> > > > is
> > >> > > > > > > > >> > > > > important. It may run at the cost of higher
> > >> latency
> > >> > > but
> > >> > > > I
> > >> > > > > > > think
> > >> > > > > > > > it
> > >> > > > > > > > >> is
> > >> > > > > > > > >> > > ok
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > And in addition, when in the future if users
> do
> > >> need
> > >> > > > > > realtime
> > >> > > > > > > > >> control
> > >> > > > > > > > >> > > > tuple
> > >> > > > > > > > >> > > > > processing, we can always add the option on
> > top of
> > >> > it.
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > So I vote for 1
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > Thanks,
> > >> > > > > > > > >> > > > > Siyuan
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A.
> > Dalvi <
> > >> > > > > > > > prad@apache.org>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > > As a rule of thumb in any real time
> operating
> > >> > > system,
> > >> > > > > > > control
> > >> > > > > > > > >> > tuples
> > >> > > > > > > > >> > > > > should
> > >> > > > > > > > >> > > > > > always be handled using Priority Queues.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > We may try to control priorities by
> defining
> > >> > levels.
> > >> > > > And
> > >> > > > > > > shall
> > >> > > > > > > > >> not
> > >> > > > > > > > >> > > > > > be delivered at window boundaries.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > In short, control tuples shall never be
> > treated
> > >> as
> > >> > > any
> > >> > > > > > other
> > >> > > > > > > > >> tuples
> > >> > > > > > > > >> > > in
> > >> > > > > > > > >> > > > > real
> > >> > > > > > > > >> > > > > > time systems.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > >> > > > > > > > david@datatorrent.com>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > > Hi all,
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > I would like to renew the discussion of
> > >> control
> > >> > > > > tuples.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Last time, we were in a debate about
> > whether:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > 1) the platform should enforce that
> control
> > >> > tuples
> > >> > > > are
> > >> > > > > > > > >> delivered
> > >> > > > > > > > >> > at
> > >> > > > > > > > >> > > > > > window
> > >> > > > > > > > >> > > > > > > boundaries only
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > or:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > 2) the platform should deliver control
> > tuples
> > >> > just
> > >> > > > as
> > >> > > > > > > other
> > >> > > > > > > > >> > tuples
> > >> > > > > > > > >> > > > and
> > >> > > > > > > > >> > > > > > it's
> > >> > > > > > > > >> > > > > > > the operator developers' choice whether
> to
> > >> > handle
> > >> > > > the
> > >> > > > > > > > control
> > >> > > > > > > > >> > > tuples
> > >> > > > > > > > >> > > > as
> > >> > > > > > > > >> > > > > > > they arrive or delay the processing till
> > the
> > >> > next
> > >> > > > > window
> > >> > > > > > > > >> > boundary.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > To summarize the pros and cons:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Approach 1: If processing control tuples
> > >> results
> > >> > > in
> > >> > > > > > > changes
> > >> > > > > > > > of
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > behavior
> > >> > > > > > > > >> > > > > > > of the operator, if idempotency needs to
> be
> > >> > > > preserved,
> > >> > > > > > the
> > >> > > > > > > > >> > > processing
> > >> > > > > > > > >> > > > > > must
> > >> > > > > > > > >> > > > > > > be done at window boundaries. This
> approach
> > >> will
> > >> > > > save
> > >> > > > > > the
> > >> > > > > > > > >> > operator
> > >> > > > > > > > >> > > > > > > developers headache to ensure that.
> > However,
> > >> > this
> > >> > > > will
> > >> > > > > > > take
> > >> > > > > > > > >> away
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > > choices from the operator developer if
> they
> > >> just
> > >> > > > need
> > >> > > > > to
> > >> > > > > > > > >> process
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > > control tuples as soon as possible.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Approach 2: The operator has a chance to
> > >> > > immediately
> > >> > > > > > > process
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > > tuples. This would be useful if latency
> is
> > >> more
> > >> > > > valued
> > >> > > > > > > than
> > >> > > > > > > > >> > > > > correctness.
> > >> > > > > > > > >> > > > > > > However, if this would open the
> possibility
> > >> for
> > >> > > > > operator
> > >> > > > > > > > >> > developers
> > >> > > > > > > > >> > > > to
> > >> > > > > > > > >> > > > > > > shoot themselves in the foot. This is
> > >> especially
> > >> > > > true
> > >> > > > > if
> > >> > > > > > > > there
> > >> > > > > > > > >> > are
> > >> > > > > > > > >> > > > > > multiple
> > >> > > > > > > > >> > > > > > > input ports. as there is no easy way to
> > >> > guarantee
> > >> > > > > > > processing
> > >> > > > > > > > >> > order
> > >> > > > > > > > >> > > > for
> > >> > > > > > > > >> > > > > > > multiple input ports.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > We would like to arrive to a consensus
> and
> > >> close
> > >> > > > this
> > >> > > > > > > > >> discussion
> > >> > > > > > > > >> > > soon
> > >> > > > > > > > >> > > > > > this
> > >> > > > > > > > >> > > > > > > time so we can start the work on this
> > >> important
> > >> > > > > feature.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Thanks!
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > David
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad
> > Rozov <
> > >> > > > > > > > >> > > > v.rozov@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > wrote:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > > It is not clear how operator will emit
> > >> custom
> > >> > > > > control
> > >> > > > > > > > tuple
> > >> > > > > > > > >> at
> > >> > > > > > > > >> > > > window
> > >> > > > > > > > >> > > > > > > > boundaries. One way is to
> > cache/accumulate
> > >> > > control
> > >> > > > > > > tuples
> > >> > > > > > > > in
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > operator
> > >> > > > > > > > >> > > > > > > > output port till window closes
> > (END_WINDOW
> > >> is
> > >> > > > > inserted
> > >> > > > > > > > into
> > >> > > > > > > > >> the
> > >> > > > > > > > >> > > > > output
> > >> > > > > > > > >> > > > > > > > sink) or only allow an operator to emit
> > >> > control
> > >> > > > > tuples
> > >> > > > > > > > inside
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > > > endWindow(). The later is a slight
> > variation
> > >> > of
> > >> > > > the
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > output
> > >> > > > > > > > >> > > > > > port
> > >> > > > > > > > >> > > > > > > > caching behavior with the only
> difference
> > >> that
> > >> > > now
> > >> > > > > the
> > >> > > > > > > > >> operator
> > >> > > > > > > > >> > > > > itself
> > >> > > > > > > > >> > > > > > is
> > >> > > > > > > > >> > > > > > > > responsible for caching/accumulating
> > control
> > >> > > > tuples.
> > >> > > > > > > Note
> > >> > > > > > > > >> that
> > >> > > > > > > > >> > in
> > >> > > > > > > > >> > > > > many
> > >> > > > > > > > >> > > > > > > > cases it will be necessary to postpone
> > >> > emitting
> > >> > > > > > payload
> > >> > > > > > > > >> tuples
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > > > > > logically come after the custom control
> > >> tuple
> > >> > > till
> > >> > > > > the
> > >> > > > > > > > next
> > >> > > > > > > > >> > > window
> > >> > > > > > > > >> > > > > > > begins.
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case
> > >> where
> > >> > > > input
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > uses a
> > >> > > > > > > > >> > > > > > push
> > >> > > > > > > > >> > > > > > > > instead of a poll (for example, it
> > provides
> > >> an
> > >> > > end
> > >> > > > > > point
> > >> > > > > > > > >> where
> > >> > > > > > > > >> > > > remote
> > >> > > > > > > > >> > > > > > > > agents may connect and publish/push
> > data),
> > >> > > control
> > >> > > > > > > tuples
> > >> > > > > > > > may
> > >> > > > > > > > >> > be
> > >> > > > > > > > >> > > > used
> > >> > > > > > > > >> > > > > > for
> > >> > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast
> to
> > >> > > > > > (partitioned)
> > >> > > > > > > > >> > > downstream
> > >> > > > > > > > >> > > > > > > > operators. In this case the platform
> just
> > >> need
> > >> > > to
> > >> > > > > > > > guarantee
> > >> > > > > > > > >> > order
> > >> > > > > > > > >> > > > > > barrier
> > >> > > > > > > > >> > > > > > > > (any tuple emitted prior to a control
> > tuple
> > >> > > needs
> > >> > > > to
> > >> > > > > > be
> > >> > > > > > > > >> > delivered
> > >> > > > > > > > >> > > > > prior
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > > the control tuple).
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > Thank you,
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > Vlad
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >> I agree with David. Allowing control
> > tuples
> > >> > > > within
> > >> > > > > a
> > >> > > > > > > > window
> > >> > > > > > > > >> > > (along
> > >> > > > > > > > >> > > > > > with
> > >> > > > > > > > >> > > > > > > >> data tuples) creates very dangerous
> > >> situation
> > >> > > > where
> > >> > > > > > > > >> guarantees
> > >> > > > > > > > >> > > are
> > >> > > > > > > > >> > > > > > > >> impacted. It is much safer to enable
> > >> control
> > >> > > > tuples
> > >> > > > > > > > >> > > (send/receive)
> > >> > > > > > > > >> > > > > at
> > >> > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of
> > >> window
> > >> > > N,
> > >> > > > > and
> > >> > > > > > > > before
> > >> > > > > > > > >> > > > > > BEGIN_WINDOW
> > >> > > > > > > > >> > > > > > > >> for window N+1). My take on David's
> > list is
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1;
> > there
> > >> > > will
> > >> > > > > be a
> > >> > > > > > > big
> > >> > > > > > > > >> > issue
> > >> > > > > > > > >> > > > with
> > >> > > > > > > > >> > > > > > > >> guarantees for operators with multiple
> > >> ports.
> > >> > > > (see
> > >> > > > > > > > Thomas's
> > >> > > > > > > > >> > > > > response)
> > >> > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1,
> but
> > >> there
> > >> > > are
> > >> > > > > > > > >> situations;
> > >> > > > > > > > >> > a
> > >> > > > > > > > >> > > > > caveat
> > >> > > > > > > > >> > > > > > > >> could be "only to operators that
> > implement
> > >> > > > control
> > >> > > > > > > tuple
> > >> > > > > > > > >> > > > > > > >> interface/listeners", which could
> > >> effectively
> > >> > > > > > > translates
> > >> > > > > > > > to
> > >> > > > > > > > >> > "all
> > >> > > > > > > > >> > > > > > > >> interested
> > >> > > > > > > > >> > > > > > > >> downstream operators"
> > >> > > > > > > > >> > > > > > > >> 3. Only Input operator can create
> > control
> > >> > > tuples
> > >> > > > ->
> > >> > > > > > -1;
> > >> > > > > > > > is
> > >> > > > > > > > >> > > > > restrictive
> > >> > > > > > > > >> > > > > > > >> even
> > >> > > > > > > > >> > > > > > > >> though most likely 95% of the time it
> > will
> > >> be
> > >> > > > input
> > >> > > > > > > > >> operators
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> Thks,
> > >> > > > > > > > >> > > > > > > >> Amol
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM,
> Thomas
> > >> > Weise <
> > >> > > > > > > > >> > > > > thomas@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >> wrote:
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> The windowing we discuss here is in
> > general
> > >> > > event
> > >> > > > > > time
> > >> > > > > > > > >> based,
> > >> > > > > > > > >> > > > > arrival
> > >> > > > > > > > >> > > > > > > time
> > >> > > > > > > > >> > > > > > > >>> is a special case of it.
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> I don't think state changes can be
> made
> > >> > > > > independent
> > >> > > > > > of
> > >> > > > > > > > the
> > >> > > > > > > > >> > > > > streaming
> > >> > > > > > > > >> > > > > > > >>> window
> > >> > > > > > > > >> > > > > > > >>> boundary as it would prevent
> idempotent
> > >> > > > processing
> > >> > > > > > and
> > >> > > > > > > > >> > > > transitively
> > >> > > > > > > > >> > > > > > > >>> exactly
> > >> > > > > > > > >> > > > > > > >>> once. For that to work, tuples need
> to
> > be
> > >> > > > > presented
> > >> > > > > > to
> > >> > > > > > > > the
> > >> > > > > > > > >> > > > operator
> > >> > > > > > > > >> > > > > > in
> > >> > > > > > > > >> > > > > > > a
> > >> > > > > > > > >> > > > > > > >>> guaranteed order *within* the
> streaming
> > >> > > window,
> > >> > > > > > which
> > >> > > > > > > is
> > >> > > > > > > > >> not
> > >> > > > > > > > >> > > > > possible
> > >> > > > > > > > >> > > > > > > >>> with
> > >> > > > > > > > >> > > > > > > >>> multiple ports (and partitions).
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> Thomas
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM,
> David
> > >> Yan <
> > >> > > > > > > > >> > > > david@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>> wrote:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> I think for session tracking, if the
> > >> session
> > >> > > > > > > boundaries
> > >> > > > > > > > are
> > >> > > > > > > > >> > > > allowed
> > >> > > > > > > > >> > > > > > to
> > >> > > > > > > > >> > > > > > > be
> > >> > > > > > > > >> > > > > > > >>>> not aligned with the streaming
> window
> > >> > > > boundaries,
> > >> > > > > > the
> > >> > > > > > > > user
> > >> > > > > > > > >> > > will
> > >> > > > > > > > >> > > > > > have a
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> much
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And
> > in
> > >> > most
> > >> > > > > cases,
> > >> > > > > > > > >> session
> > >> > > > > > > > >> > > > > tracking
> > >> > > > > > > > >> > > > > > > is
> > >> > > > > > > > >> > > > > > > >>>> event time based, not ingression
> time
> > or
> > >> > > > > processing
> > >> > > > > > > > time
> > >> > > > > > > > >> > > based,
> > >> > > > > > > > >> > > > so
> > >> > > > > > > > >> > > > > > > this
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> may
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
> > >> > happens,
> > >> > > > the
> > >> > > > > > > user
> > >> > > > > > > > can
> > >> > > > > > > > >> > > > always
> > >> > > > > > > > >> > > > > > > alter
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> default 500ms width.
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> David
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM,
> Vlad
> > >> > Rozov <
> > >> > > > > > > > >> > > > > > v.rozov@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> Ability to send custom control
> tuples
> > >> > within
> > >> > > > > window
> > >> > > > > > > > may be
> > >> > > > > > > > >> > > > useful,
> > >> > > > > > > > >> > > > > > for
> > >> > > > > > > > >> > > > > > > >>>>> example, for sessions tracking,
> where
> > >> > > session
> > >> > > > > > > > boundaries
> > >> > > > > > > > >> > are
> > >> > > > > > > > >> > > > not
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>> aligned
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms
> > latency
> > >> > is
> > >> > > > not
> > >> > > > > > > > >> acceptable
> > >> > > > > > > > >> > > for
> > >> > > > > > > > >> > > > an
> > >> > > > > > > > >> > > > > > > >>>>> application.
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> Thank you,
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> Vlad
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise
> wrote:
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> It should not matter from where the
> > >> > control
> > >> > > > > tuple
> > >> > > > > > is
> > >> > > > > > > > >> > > triggered.
> > >> > > > > > > > >> > > > > It
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> will
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> be
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> > >> > > propagate
> > >> > > > it
> > >> > > > > > and
> > >> > > > > > > > >> other
> > >> > > > > > > > >> > > > things
> > >> > > > > > > > >> > > > > > can
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> be
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
> > >> > example,
> > >> > > > the
> > >> > > > > > new
> > >> > > > > > > > >> > > > > comprehensive
> > >> > > > > > > > >> > > > > > > >>>>>> support
> > >> > > > > > > > >> > > > > > > >>>>>> for windowing will all be in
> Malhar,
> > >> > > nothing
> > >> > > > > that
> > >> > > > > > > the
> > >> > > > > > > > >> > engine
> > >> > > > > > > > >> > > > > needs
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> know
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> about it except that we need the
> > control
> > >> > > tuple
> > >> > > > > for
> > >> > > > > > > > >> > watermark
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> propagation
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> and idempotent processing.
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> I also think the main difference
> to
> > >> other
> > >> > > > > tuples
> > >> > > > > > is
> > >> > > > > > > > the
> > >> > > > > > > > >> > need
> > >> > > > > > > > >> > > > to
> > >> > > > > > > > >> > > > > > send
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> it
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> to
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> > >> > > checkpoint
> > >> > > > > > > window
> > >> > > > > > > > >> > tuples,
> > >> > > > > > > > >> > > > but
> > >> > > > > > > > >> > > > > > not
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> the
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need
> the
> > >> > > ability
> > >> > > > > for
> > >> > > > > > > the
> > >> > > > > > > > >> user
> > >> > > > > > > > >> > to
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse
> > the
> > >> > > entire
> > >> > > > > DAG
> > >> > > > > > > or
> > >> > > > > > > > >> not.
> > >> > > > > > > > >> > > For
> > >> > > > > > > > >> > > > a
> > >> > > > > > > > >> > > > > > > batch
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> use
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to
> > send
> > >> the
> > >> > > end
> > >> > > > > of
> > >> > > > > > > > file to
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > next
> > >> > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the
> > >> operator
> > >> > > has
> > >> > > > > > > > >> asynchronous
> > >> > > > > > > > >> > > > > > > processing
> > >> > > > > > > > >> > > > > > > >>>>>> logic
> > >> > > > > > > > >> > > > > > > >>>>>> in it.
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent,
> the
> > >> > control
> > >> > > > > tuple
> > >> > > > > > > > needs
> > >> > > > > > > > >> to
> > >> > > > > > > > >> > > be
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> processed
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> at
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the
> > control
> > >> > > tuple
> > >> > > > > in
> > >> > > > > > > the
> > >> > > > > > > > >> > window
> > >> > > > > > > > >> > > > > > callback
> > >> > > > > > > > >> > > > > > > >>>>>> would
> > >> > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state
> in
> > >> the
> > >> > > > > > operator.
> > >> > > > > > > I
> > >> > > > > > > > >> don't
> > >> > > > > > > > >> > > > think
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> that's
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> a
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use
> case
> > >> for
> > >> > > > > > > processing a
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > tuple
> > >> > > > > > > > >> > > > > > > >>>>>> within
> > >> > > > > > > > >> > > > > > > >>>>>> the window?
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> Thomas
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM,
> > Pramod
> > >> > > > > Immaneni
> > >> > > > > > <
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> pramod@datatorrent.com
> > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I
> > >> think
> > >> > 1)
> > >> > > > and
> > >> > > > > > 2)
> > >> > > > > > > > are
> > >> > > > > > > > >> > more
> > >> > > > > > > > >> > > > > likely
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
> > >> > application,
> > >> > > > 3)
> > >> > > > > > and
> > >> > > > > > > 4)
> > >> > > > > > > > >> are
> > >> > > > > > > > >> > > more
> > >> > > > > > > > >> > > > > > > likely
> > >> > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally
> > and
> > >> > > > directly
> > >> > > > > > > > handled
> > >> > > > > > > > >> by
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > engine
> > >> > > > > > > > >> > > > > > > >>>>>>> and 3) is already being
> implemented
> > >> that
> > >> > > way
> > >> > > > > > > > >> > > (apexcore-163).
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
> > >> > operator
> > >> > > > > would
> > >> > > > > > be
> > >> > > > > > > > sent
> > >> > > > > > > > >> > to
> > >> > > > > > > > >> > > > all
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it
> and
> > >> that
> > >> > > > would
> > >> > > > > be
> > >> > > > > > > the
> > >> > > > > > > > >> > chief
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> distinction
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> compared to data (apart from the
> > payload)
> > >> > > which
> > >> > > > > > would
> > >> > > > > > > > get
> > >> > > > > > > > >> > > > > > partitioned
> > >> > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It
> > would
> > >> > also
> > >> > > be
> > >> > > > > > > > guaranteed
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions will
> receive
> > >> > control
> > >> > > > > > tuples
> > >> > > > > > > > only
> > >> > > > > > > > >> > > after
> > >> > > > > > > > >> > > > > the
> > >> > > > > > > > >> > > > > > > data
> > >> > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we
> could
> > >> send
> > >> > > it
> > >> > > > > > > > immediately
> > >> > > > > > > > >> > > when
> > >> > > > > > > > >> > > > it
> > >> > > > > > > > >> > > > > > is
> > >> > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window
> > >> boundaries.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> However during unification it is
> > >> > important
> > >> > > > to
> > >> > > > > > know
> > >> > > > > > > > if
> > >> > > > > > > > >> > these
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>>> tuples have been received from
> all
> > >> > > upstream
> > >> > > > > > > > partitions
> > >> > > > > > > > >> > > before
> > >> > > > > > > > >> > > > > > > >>>>>>> proceeding with a control
> > operation.
> > >> One
> > >> > > > could
> > >> > > > > > > wait
> > >> > > > > > > > >> till
> > >> > > > > > > > >> > > end
> > >> > > > > > > > >> > > > of
> > >> > > > > > > > >> > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>> window but that introduces a
> delay
> > >> > however
> > >> > > > > > small,
> > >> > > > > > > I
> > >> > > > > > > > >> would
> > >> > > > > > > > >> > > > like
> > >> > > > > > > > >> > > > > to
> > >> > > > > > > > >> > > > > > > add
> > >> > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform
> > only
> > >> > > hand
> > >> > > > > over
> > >> > > > > > > the
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > tuple
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been
> > received
> > >> > from
> > >> > > > all
> > >> > > > > > > > upstream
> > >> > > > > > > > >> > > > > > partitions
> > >> > > > > > > > >> > > > > > > >>>>>>> much like how end window is
> > processed
> > >> > but
> > >> > > > not
> > >> > > > > > wait
> > >> > > > > > > > till
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > actual
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> end
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> of the window.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about
> > idempotency,
> > >> we
> > >> > > > > > typically
> > >> > > > > > > > care
> > >> > > > > > > > >> > > about
> > >> > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and
> > >> doing
> > >> > > the
> > >> > > > > > above
> > >> > > > > > > > will
> > >> > > > > > > > >> > > still
> > >> > > > > > > > >> > > > > > allow
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> operators to preserve that easily.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> Thanks
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM,
> David
> > >> Yan
> > >> > <
> > >> > > > > > > > >> > > > david@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> Hi all,
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new
> > feature
> > >> > to
> > >> > > > the
> > >> > > > > > Apex
> > >> > > > > > > > core
> > >> > > > > > > > >> > > > engine
> > >> > > > > > > > >> > > > > --
> > >> > > > > > > > >> > > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> support of custom control
> tuples.
> > >> > > > Currently,
> > >> > > > > we
> > >> > > > > > > > have
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > > tuples
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> such
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> as
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW,
> > CHECKPOINT,
> > >> > and
> > >> > > so
> > >> > > > > on,
> > >> > > > > > > > but we
> > >> > > > > > > > >> > > don't
> > >> > > > > > > > >> > > > > > have
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> support for applications to insert
> > their
> > >> > own
> > >> > > > > > control
> > >> > > > > > > > >> tuples.
> > >> > > > > > > > >> > > The
> > >> > > > > > > > >> > > > > way
> > >> > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is
> to
> > >> use
> > >> > > data
> > >> > > > > > > tuples
> > >> > > > > > > > and
> > >> > > > > > > > >> > > have
> > >> > > > > > > > >> > > > a
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> separate
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> port
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples
> > to
> > >> all
> > >> > > > > > > partitions
> > >> > > > > > > > of
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> operators, which is not exactly
> > developer
> > >> > > > > friendly.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of
> > use
> > >> > > cases
> > >> > > > > that
> > >> > > > > > > can
> > >> > > > > > > > >> use
> > >> > > > > > > > >> > > this
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> feature:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell
> all
> > >> > > operators
> > >> > > > > of
> > >> > > > > > > the
> > >> > > > > > > > >> > > physical
> > >> > > > > > > > >> > > > > DAG
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> when
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> a
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the
> > >> operators
> > >> > > can
> > >> > > > do
> > >> > > > > > > > whatever
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > is
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> needed
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> upon
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the
> > concepts
> > >> > of
> > >> > > > > event
> > >> > > > > > > time
> > >> > > > > > > > >> > > > windowing,
> > >> > > > > > > > >> > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is
> needed
> > to
> > >> > tell
> > >> > > > > which
> > >> > > > > > > > >> windows
> > >> > > > > > > > >> > > > should
> > >> > > > > > > > >> > > > > > be
> > >> > > > > > > > >> > > > > > > >>>>>>>> considered late.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties:
> > We
> > >> do
> > >> > > have
> > >> > > > > the
> > >> > > > > > > > >> support
> > >> > > > > > > > >> > of
> > >> > > > > > > > >> > > > > > > changing
> > >> > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly,
> > but
> > >> > with
> > >> > > a
> > >> > > > > > custom
> > >> > > > > > > > >> > control
> > >> > > > > > > > >> > > > > tuple,
> > >> > > > > > > > >> > > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> command to change operator
> > properties
> > >> > can
> > >> > > > be
> > >> > > > > > > window
> > >> > > > > > > > >> > > aligned
> > >> > > > > > > > >> > > > > for
> > >> > > > > > > > >> > > > > > > all
> > >> > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the
> > DAG.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like
> changing
> > >> > > operator
> > >> > > > > > > > >> properties,
> > >> > > > > > > > >> > we
> > >> > > > > > > > >> > > > do
> > >> > > > > > > > >> > > > > > have
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> this
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> support now but only at the
> > individual
> > >> > > > physical
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > level,
> > >> > > > > > > > >> > > > > and
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> without
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> control of which window to record
> > >> tuples
> > >> > > > for.
> > >> > > > > > > With a
> > >> > > > > > > > >> > custom
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> tuple,
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must
> belong
> > >> to a
> > >> > > > > window,
> > >> > > > > > > all
> > >> > > > > > > > >> > > > operators
> > >> > > > > > > > >> > > > > in
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> DAG
> > >> > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording
> for
> > >> the
> > >> > > same
> > >> > > > > > > > windows.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to
> > achieve
> > >> > > this:
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type
> > that
> > >> > > takes
> > >> > > > > > > user's
> > >> > > > > > > > >> > > > > serializable
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> object.
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current
> > BEGIN_WINDOW
> > >> and
> > >> > > > > > > END_WINDOW
> > >> > > > > > > > >> > control
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> tuples.
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank
> > you.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> David
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> >
> > >> > > > > > > > >>
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> >
>

Re: [DISCUSSION] Custom Control Tuples

Posted by David Yan <da...@datatorrent.com>.
Bhupesh, Sandesh, Tushar:

Thanks for volunteering. This task probably needs all three of you to work
closely together.

The subtasks so far are:

https://issues.apache.org/jira/browse/APEXCORE-580
https://issues.apache.org/jira/browse/APEXCORE-581

Please first review the subtasks and see whether anything is missing and
add your thoughts to the tickets if you have any preliminary idea how to
implement them.

By the way, I think APEXCORE-581 is more involving and it might be a good
idea to split that up further. It also makes sense since there are three of
you.

David


On Thu, Dec 1, 2016 at 3:41 AM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> I am also interested working on this feature.
>
> - Tushar.
>
>
> On Thu, Dec 1, 2016 at 10:27 AM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
> > I would like to work on https://issues.apache.org/
> jira/browse/APEXCORE-580.
> >
> > ~ Bhupesh
> >
> > On Thu, Dec 1, 2016 at 5:42 AM, Sandesh Hegde <sa...@datatorrent.com>
> > wrote:
> >
> >> I am interested in working on the following subtask
> >>
> >> https://issues.apache.org/jira/browse/APEXCORE-581
> >>
> >> Thanks
> >>
> >>
> >> On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com>
> wrote:
> >>
> >> > I have created an umbrella ticket for control tuple support:
> >> >
> >> > https://issues.apache.org/jira/browse/APEXCORE-579
> >> >
> >> > Currently it has two subtasks. Please have a look at them and see
> whether
> >> > I'm missing anything or if you have anything to add. You are welcome
> to
> >> add
> >> > more subtasks or comment on the existing subtasks.
> >> >
> >> > We would like to kick start the implementation soon.
> >> >
> >> > Thanks!
> >> >
> >> > David
> >> >
> >> > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> >> >
> >> > wrote:
> >> >
> >> > > +1 for the plan.
> >> > >
> >> > > I would be interested in contributing to this feature.
> >> > >
> >> > > ~ Bhupesh
> >> > >
> >> > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sa...@datatorrent.com>
> >> wrote:
> >> > >
> >> > > > I am interested in contributing to this feature.
> >> > > >
> >> > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com>
> >> > wrote:
> >> > > >
> >> > > > > I think we should probably go ahead with option 1 since this
> works
> >> > with
> >> > > > > most use cases and prevents developers from shooting themselves
> in
> >> > the
> >> > > > foot
> >> > > > > in terms of idempotency.
> >> > > > >
> >> > > > > We can have a configuration property that enables option 2
> later if
> >> > we
> >> > > > have
> >> > > > > concrete use cases that call for it.
> >> > > > >
> >> > > > > Please share your thoughts if you think you don't agree with
> this
> >> > plan.
> >> > > > > Also, please indicate if you're interested in contributing to
> this
> >> > > > feature.
> >> > > > >
> >> > > > > David
> >> > > > >
> >> > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> >> > > bhupesh@datatorrent.com
> >> > > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > It appears that option 1 is more favored due to unavailability
> >> of a
> >> > > use
> >> > > > > > case which could use option 2.
> >> > > > > >
> >> > > > > > However, option 2 is problematic in specific cases, like
> presence
> >> > of
> >> > > > > > multiple input ports for example. In case of a linear DAG
> where
> >> > > control
> >> > > > > > tuples are flowing in order with the data tuples, it should
> not
> >> be
> >> > > > > > difficult to guarantee idempotency. For example, cases where
> >> there
> >> > > > could
> >> > > > > be
> >> > > > > > multiple changes in behavior of an operator during a single
> >> window,
> >> > > it
> >> > > > > > should not wait until end window for these changes to take
> >> effect.
> >> > > > Since,
> >> > > > > > we don't have a concrete use case right now, perhaps we do not
> >> want
> >> > > to
> >> > > > go
> >> > > > > > that road. This feature should be available through a platform
> >> > > > attribute
> >> > > > > > (may be at a later point in time) where the default is option
> 1.
> >> > > > > >
> >> > > > > > I think option 1 is suitable for a starting point in the
> >> > > implementation
> >> > > > > of
> >> > > > > > this feature and we should proceed with it.
> >> > > > > >
> >> > > > > > ~ Bhupesh
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <
> >> david@datatorrent.com
> >> > >
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Good question Tushar. The callback should be called only
> once.
> >> > > > > > > The way to implement this is to keep a list of control tuple
> >> > hashes
> >> > > > for
> >> > > > > > the
> >> > > > > > > given streaming window and only do the callback when the
> >> operator
> >> > > has
> >> > > > > not
> >> > > > > > > seen it before.
> >> > > > > > >
> >> > > > > > > Other thoughts?
> >> > > > > > >
> >> > > > > > > David
> >> > > > > > >
> >> > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> >> > > > tushar@datatorrent.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi David,
> >> > > > > > > >
> >> > > > > > > > What would be the behaviour in case where we have a DAG
> with
> >> > > > > following
> >> > > > > > > > operators, the number in bracket is number of partitions,
> X
> >> is
> >> > > NxM
> >> > > > > > > > partitioning.
> >> > > > > > > > A(1) X B(4) X C(2)
> >> > > > > > > >
> >> > > > > > > > If A sends a control tuple, it will be sent to all 4
> >> partition
> >> > of
> >> > > > B,
> >> > > > > > > > and from each partition from B it goes to C, i.e each
> >> partition
> >> > > of
> >> > > > C
> >> > > > > > > > will receive same control tuple originated from A multiple
> >> > times
> >> > > > > > > > (number of upstream partitions of C). In this case will
> the
> >> > > > callback
> >> > > > > > > > function get called multiple times or just once.
> >> > > > > > > >
> >> > > > > > > > -Tushar.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> >> > > david@datatorrent.com>
> >> > > > > > > wrote:
> >> > > > > > > > > Hi Bhupesh,
> >> > > > > > > > >
> >> > > > > > > > > Since each input port has its own incoming control
> tuple, I
> >> > > would
> >> > > > > > > imagine
> >> > > > > > > > > there would be an additional DefaultInputPort.
> >> processControl
> >> > > > method
> >> > > > > > > that
> >> > > > > > > > > operator developers can override.
> >> > > > > > > > > If we go for option 1, my thinking is that the control
> >> tuples
> >> > > > would
> >> > > > > > > > always
> >> > > > > > > > > be delivered at the next window boundary, even if the
> emit
> >> > > method
> >> > > > > is
> >> > > > > > > > called
> >> > > > > > > > > within a window.
> >> > > > > > > > >
> >> > > > > > > > > David
> >> > > > > > > > >
> >> > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> >> > > > > > > bhupesh@datatorrent.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > >> I have a question regarding the callback for a control
> >> > tuple.
> >> > > > Will
> >> > > > > > it
> >> > > > > > > be
> >> > > > > > > > >> similar to InputPort::process() method? Something like
> >> > > > > > > > >> InputPort::processControlTuple(t)
> >> > > > > > > > >> ? Or will it be a method of the operator similar to
> >> > > > beginWindow()?
> >> > > > > > > > >>
> >> > > > > > > > >> When we say that the control tuple will be delivered at
> >> > window
> >> > > > > > > boundary,
> >> > > > > > > > >> does that mean all control tuples emitted in that
> window
> >> > will
> >> > > be
> >> > > > > > > > processed
> >> > > > > > > > >> together at the end of the window? This would imply
> that
> >> > there
> >> > > > is
> >> > > > > no
> >> > > > > > > > >> ordering among regular tuples and control tuples.
> >> > > > > > > > >>
> >> > > > > > > > >> I think we should get started with the option 1 -
> control
> >> > > tuples
> >> > > > > at
> >> > > > > > > > window
> >> > > > > > > > >> boundary, which seems to handle most of the use cases.
> For
> >> > > some
> >> > > > > > cases
> >> > > > > > > > which
> >> > > > > > > > >> require option 2, we can always build on this.
> >> > > > > > > > >>
> >> > > > > > > > >> ~ Bhupesh
> >> > > > > > > > >>
> >> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
> >> > thw@apache.org>
> >> > > > > > wrote:
> >> > > > > > > > >>
> >> > > > > > > > >> > I don't see how that would work. Suppose you have a
> file
> >> > > > > splitter
> >> > > > > > > and
> >> > > > > > > > >> > multiple partitions of block readers. The "end of
> file"
> >> > > event
> >> > > > > > cannot
> >> > > > > > > > be
> >> > > > > > > > >> > processed downstream until all block readers are
> done. I
> >> > > also
> >> > > > > > think
> >> > > > > > > > that
> >> > > > > > > > >> > this is related to the batch demarcation discussion
> and
> >> > > there
> >> > > > > > should
> >> > > > > > > > be a
> >> > > > > > > > >> > single generalized mechanism to support this.
> >> > > > > > > > >> >
> >> > > > > > > > >> >
> >> > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> >> > > > > > > > pramod@datatorrent.com
> >> > > > > > > > >> >
> >> > > > > > > > >> > wrote:
> >> > > > > > > > >> >
> >> > > > > > > > >> > > Suppose I am processing data in a file and I want
> to
> >> do
> >> > > > > > something
> >> > > > > > > at
> >> > > > > > > > >> the
> >> > > > > > > > >> > > end of a file at the output operator, I would send
> an
> >> > end
> >> > > > file
> >> > > > > > > > control
> >> > > > > > > > >> > > tuple and act on it when I receive it at the
> output.
> >> In
> >> > a
> >> > > > > single
> >> > > > > > > > >> window I
> >> > > > > > > > >> > > may end up processing multiple files and if I don't
> >> have
> >> > > > > > multiple
> >> > > > > > > > ports
> >> > > > > > > > >> > and
> >> > > > > > > > >> > > logical paths through the DAG (multiple partitions
> are
> >> > > ok).
> >> > > > I
> >> > > > > > can
> >> > > > > > > > >> process
> >> > > > > > > > >> > > end of each file immediately and also know what
> file
> >> was
> >> > > > > closed
> >> > > > > > > > without
> >> > > > > > > > >> > > sending extra identification information in the end
> >> file
> >> > > > > which I
> >> > > > > > > > would
> >> > > > > > > > >> > need
> >> > > > > > > > >> > > if I am collecting all of them and processing at
> the
> >> end
> >> > > of
> >> > > > > the
> >> > > > > > > > window.
> >> > > > > > > > >> > >
> >> > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> >> > > > thw@apache.org>
> >> > > > > > > > wrote:
> >> > > > > > > > >> > >
> >> > > > > > > > >> > > > The use cases listed in the original discussion
> >> don't
> >> > > call
> >> > > > > for
> >> > > > > > > > option
> >> > > > > > > > >> > 2.
> >> > > > > > > > >> > > It
> >> > > > > > > > >> > > > seems to come with additional complexity and
> >> > > > implementation
> >> > > > > > > cost.
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > > > Can those in favor of option 2 please also
> provide
> >> the
> >> > > use
> >> > > > > > case
> >> > > > > > > > for
> >> > > > > > > > >> it.
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > > > Thanks,
> >> > > > > > > > >> > > > Thomas
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> >> > > > > > > > siyuan@datatorrent.com>
> >> > > > > > > > >> > > > wrote:
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > > > > I will vote for approach 1.
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > First of all that one sounds easier to do to
> me.
> >> > And I
> >> > > > > think
> >> > > > > > > > >> > > idempotency
> >> > > > > > > > >> > > > is
> >> > > > > > > > >> > > > > important. It may run at the cost of higher
> >> latency
> >> > > but
> >> > > > I
> >> > > > > > > think
> >> > > > > > > > it
> >> > > > > > > > >> is
> >> > > > > > > > >> > > ok
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > And in addition, when in the future if users do
> >> need
> >> > > > > > realtime
> >> > > > > > > > >> control
> >> > > > > > > > >> > > > tuple
> >> > > > > > > > >> > > > > processing, we can always add the option on
> top of
> >> > it.
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > So I vote for 1
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > Thanks,
> >> > > > > > > > >> > > > > Siyuan
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A.
> Dalvi <
> >> > > > > > > > prad@apache.org>
> >> > > > > > > > >> > > > wrote:
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > > > > As a rule of thumb in any real time operating
> >> > > system,
> >> > > > > > > control
> >> > > > > > > > >> > tuples
> >> > > > > > > > >> > > > > should
> >> > > > > > > > >> > > > > > always be handled using Priority Queues.
> >> > > > > > > > >> > > > > >
> >> > > > > > > > >> > > > > > We may try to control priorities by defining
> >> > levels.
> >> > > > And
> >> > > > > > > shall
> >> > > > > > > > >> not
> >> > > > > > > > >> > > > > > be delivered at window boundaries.
> >> > > > > > > > >> > > > > >
> >> > > > > > > > >> > > > > > In short, control tuples shall never be
> treated
> >> as
> >> > > any
> >> > > > > > other
> >> > > > > > > > >> tuples
> >> > > > > > > > >> > > in
> >> > > > > > > > >> > > > > real
> >> > > > > > > > >> > > > > > time systems.
> >> > > > > > > > >> > > > > >
> >> > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> >> > > > > > > > david@datatorrent.com>
> >> > > > > > > > >> > > > wrote:
> >> > > > > > > > >> > > > > >
> >> > > > > > > > >> > > > > > > Hi all,
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > I would like to renew the discussion of
> >> control
> >> > > > > tuples.
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > Last time, we were in a debate about
> whether:
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > 1) the platform should enforce that control
> >> > tuples
> >> > > > are
> >> > > > > > > > >> delivered
> >> > > > > > > > >> > at
> >> > > > > > > > >> > > > > > window
> >> > > > > > > > >> > > > > > > boundaries only
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > or:
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > 2) the platform should deliver control
> tuples
> >> > just
> >> > > > as
> >> > > > > > > other
> >> > > > > > > > >> > tuples
> >> > > > > > > > >> > > > and
> >> > > > > > > > >> > > > > > it's
> >> > > > > > > > >> > > > > > > the operator developers' choice whether to
> >> > handle
> >> > > > the
> >> > > > > > > > control
> >> > > > > > > > >> > > tuples
> >> > > > > > > > >> > > > as
> >> > > > > > > > >> > > > > > > they arrive or delay the processing till
> the
> >> > next
> >> > > > > window
> >> > > > > > > > >> > boundary.
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > To summarize the pros and cons:
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > Approach 1: If processing control tuples
> >> results
> >> > > in
> >> > > > > > > changes
> >> > > > > > > > of
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > > behavior
> >> > > > > > > > >> > > > > > > of the operator, if idempotency needs to be
> >> > > > preserved,
> >> > > > > > the
> >> > > > > > > > >> > > processing
> >> > > > > > > > >> > > > > > must
> >> > > > > > > > >> > > > > > > be done at window boundaries. This approach
> >> will
> >> > > > save
> >> > > > > > the
> >> > > > > > > > >> > operator
> >> > > > > > > > >> > > > > > > developers headache to ensure that.
> However,
> >> > this
> >> > > > will
> >> > > > > > > take
> >> > > > > > > > >> away
> >> > > > > > > > >> > > the
> >> > > > > > > > >> > > > > > > choices from the operator developer if they
> >> just
> >> > > > need
> >> > > > > to
> >> > > > > > > > >> process
> >> > > > > > > > >> > > the
> >> > > > > > > > >> > > > > > > control tuples as soon as possible.
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > Approach 2: The operator has a chance to
> >> > > immediately
> >> > > > > > > process
> >> > > > > > > > >> > > control
> >> > > > > > > > >> > > > > > > tuples. This would be useful if latency is
> >> more
> >> > > > valued
> >> > > > > > > than
> >> > > > > > > > >> > > > > correctness.
> >> > > > > > > > >> > > > > > > However, if this would open the possibility
> >> for
> >> > > > > operator
> >> > > > > > > > >> > developers
> >> > > > > > > > >> > > > to
> >> > > > > > > > >> > > > > > > shoot themselves in the foot. This is
> >> especially
> >> > > > true
> >> > > > > if
> >> > > > > > > > there
> >> > > > > > > > >> > are
> >> > > > > > > > >> > > > > > multiple
> >> > > > > > > > >> > > > > > > input ports. as there is no easy way to
> >> > guarantee
> >> > > > > > > processing
> >> > > > > > > > >> > order
> >> > > > > > > > >> > > > for
> >> > > > > > > > >> > > > > > > multiple input ports.
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > We would like to arrive to a consensus and
> >> close
> >> > > > this
> >> > > > > > > > >> discussion
> >> > > > > > > > >> > > soon
> >> > > > > > > > >> > > > > > this
> >> > > > > > > > >> > > > > > > time so we can start the work on this
> >> important
> >> > > > > feature.
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > Thanks!
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > David
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad
> Rozov <
> >> > > > > > > > >> > > > v.rozov@datatorrent.com
> >> > > > > > > > >> > > > > > > <javascript:;>>
> >> > > > > > > > >> > > > > > > wrote:
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > > > > It is not clear how operator will emit
> >> custom
> >> > > > > control
> >> > > > > > > > tuple
> >> > > > > > > > >> at
> >> > > > > > > > >> > > > window
> >> > > > > > > > >> > > > > > > > boundaries. One way is to
> cache/accumulate
> >> > > control
> >> > > > > > > tuples
> >> > > > > > > > in
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > > operator
> >> > > > > > > > >> > > > > > > > output port till window closes
> (END_WINDOW
> >> is
> >> > > > > inserted
> >> > > > > > > > into
> >> > > > > > > > >> the
> >> > > > > > > > >> > > > > output
> >> > > > > > > > >> > > > > > > > sink) or only allow an operator to emit
> >> > control
> >> > > > > tuples
> >> > > > > > > > inside
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > > > > endWindow(). The later is a slight
> variation
> >> > of
> >> > > > the
> >> > > > > > > > operator
> >> > > > > > > > >> > > output
> >> > > > > > > > >> > > > > > port
> >> > > > > > > > >> > > > > > > > caching behavior with the only difference
> >> that
> >> > > now
> >> > > > > the
> >> > > > > > > > >> operator
> >> > > > > > > > >> > > > > itself
> >> > > > > > > > >> > > > > > is
> >> > > > > > > > >> > > > > > > > responsible for caching/accumulating
> control
> >> > > > tuples.
> >> > > > > > > Note
> >> > > > > > > > >> that
> >> > > > > > > > >> > in
> >> > > > > > > > >> > > > > many
> >> > > > > > > > >> > > > > > > > cases it will be necessary to postpone
> >> > emitting
> >> > > > > > payload
> >> > > > > > > > >> tuples
> >> > > > > > > > >> > > that
> >> > > > > > > > >> > > > > > > > logically come after the custom control
> >> tuple
> >> > > till
> >> > > > > the
> >> > > > > > > > next
> >> > > > > > > > >> > > window
> >> > > > > > > > >> > > > > > > begins.
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case
> >> where
> >> > > > input
> >> > > > > > > > operator
> >> > > > > > > > >> > > uses a
> >> > > > > > > > >> > > > > > push
> >> > > > > > > > >> > > > > > > > instead of a poll (for example, it
> provides
> >> an
> >> > > end
> >> > > > > > point
> >> > > > > > > > >> where
> >> > > > > > > > >> > > > remote
> >> > > > > > > > >> > > > > > > > agents may connect and publish/push
> data),
> >> > > control
> >> > > > > > > tuples
> >> > > > > > > > may
> >> > > > > > > > >> > be
> >> > > > > > > > >> > > > used
> >> > > > > > > > >> > > > > > for
> >> > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
> >> > > > > > (partitioned)
> >> > > > > > > > >> > > downstream
> >> > > > > > > > >> > > > > > > > operators. In this case the platform just
> >> need
> >> > > to
> >> > > > > > > > guarantee
> >> > > > > > > > >> > order
> >> > > > > > > > >> > > > > > barrier
> >> > > > > > > > >> > > > > > > > (any tuple emitted prior to a control
> tuple
> >> > > needs
> >> > > > to
> >> > > > > > be
> >> > > > > > > > >> > delivered
> >> > > > > > > > >> > > > > prior
> >> > > > > > > > >> > > > > > > to
> >> > > > > > > > >> > > > > > > > the control tuple).
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > > Thank you,
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > > Vlad
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > > >> I agree with David. Allowing control
> tuples
> >> > > > within
> >> > > > > a
> >> > > > > > > > window
> >> > > > > > > > >> > > (along
> >> > > > > > > > >> > > > > > with
> >> > > > > > > > >> > > > > > > >> data tuples) creates very dangerous
> >> situation
> >> > > > where
> >> > > > > > > > >> guarantees
> >> > > > > > > > >> > > are
> >> > > > > > > > >> > > > > > > >> impacted. It is much safer to enable
> >> control
> >> > > > tuples
> >> > > > > > > > >> > > (send/receive)
> >> > > > > > > > >> > > > > at
> >> > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of
> >> window
> >> > > N,
> >> > > > > and
> >> > > > > > > > before
> >> > > > > > > > >> > > > > > BEGIN_WINDOW
> >> > > > > > > > >> > > > > > > >> for window N+1). My take on David's
> list is
> >> > > > > > > > >> > > > > > > >>
> >> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1;
> there
> >> > > will
> >> > > > > be a
> >> > > > > > > big
> >> > > > > > > > >> > issue
> >> > > > > > > > >> > > > with
> >> > > > > > > > >> > > > > > > >> guarantees for operators with multiple
> >> ports.
> >> > > > (see
> >> > > > > > > > Thomas's
> >> > > > > > > > >> > > > > response)
> >> > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but
> >> there
> >> > > are
> >> > > > > > > > >> situations;
> >> > > > > > > > >> > a
> >> > > > > > > > >> > > > > caveat
> >> > > > > > > > >> > > > > > > >> could be "only to operators that
> implement
> >> > > > control
> >> > > > > > > tuple
> >> > > > > > > > >> > > > > > > >> interface/listeners", which could
> >> effectively
> >> > > > > > > translates
> >> > > > > > > > to
> >> > > > > > > > >> > "all
> >> > > > > > > > >> > > > > > > >> interested
> >> > > > > > > > >> > > > > > > >> downstream operators"
> >> > > > > > > > >> > > > > > > >> 3. Only Input operator can create
> control
> >> > > tuples
> >> > > > ->
> >> > > > > > -1;
> >> > > > > > > > is
> >> > > > > > > > >> > > > > restrictive
> >> > > > > > > > >> > > > > > > >> even
> >> > > > > > > > >> > > > > > > >> though most likely 95% of the time it
> will
> >> be
> >> > > > input
> >> > > > > > > > >> operators
> >> > > > > > > > >> > > > > > > >>
> >> > > > > > > > >> > > > > > > >> Thks,
> >> > > > > > > > >> > > > > > > >> Amol
> >> > > > > > > > >> > > > > > > >>
> >> > > > > > > > >> > > > > > > >>
> >> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas
> >> > Weise <
> >> > > > > > > > >> > > > > thomas@datatorrent.com
> >> > > > > > > > >> > > > > > > <javascript:;>>
> >> > > > > > > > >> > > > > > > >> wrote:
> >> > > > > > > > >> > > > > > > >>
> >> > > > > > > > >> > > > > > > >> The windowing we discuss here is in
> general
> >> > > event
> >> > > > > > time
> >> > > > > > > > >> based,
> >> > > > > > > > >> > > > > arrival
> >> > > > > > > > >> > > > > > > time
> >> > > > > > > > >> > > > > > > >>> is a special case of it.
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>> I don't think state changes can be made
> >> > > > > independent
> >> > > > > > of
> >> > > > > > > > the
> >> > > > > > > > >> > > > > streaming
> >> > > > > > > > >> > > > > > > >>> window
> >> > > > > > > > >> > > > > > > >>> boundary as it would prevent idempotent
> >> > > > processing
> >> > > > > > and
> >> > > > > > > > >> > > > transitively
> >> > > > > > > > >> > > > > > > >>> exactly
> >> > > > > > > > >> > > > > > > >>> once. For that to work, tuples need to
> be
> >> > > > > presented
> >> > > > > > to
> >> > > > > > > > the
> >> > > > > > > > >> > > > operator
> >> > > > > > > > >> > > > > > in
> >> > > > > > > > >> > > > > > > a
> >> > > > > > > > >> > > > > > > >>> guaranteed order *within* the streaming
> >> > > window,
> >> > > > > > which
> >> > > > > > > is
> >> > > > > > > > >> not
> >> > > > > > > > >> > > > > possible
> >> > > > > > > > >> > > > > > > >>> with
> >> > > > > > > > >> > > > > > > >>> multiple ports (and partitions).
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>> Thomas
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David
> >> Yan <
> >> > > > > > > > >> > > > david@datatorrent.com
> >> > > > > > > > >> > > > > > > <javascript:;>>
> >> > > > > > > > >> > > > > > > >>> wrote:
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>> I think for session tracking, if the
> >> session
> >> > > > > > > boundaries
> >> > > > > > > > are
> >> > > > > > > > >> > > > allowed
> >> > > > > > > > >> > > > > > to
> >> > > > > > > > >> > > > > > > be
> >> > > > > > > > >> > > > > > > >>>> not aligned with the streaming window
> >> > > > boundaries,
> >> > > > > > the
> >> > > > > > > > user
> >> > > > > > > > >> > > will
> >> > > > > > > > >> > > > > > have a
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>> much
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And
> in
> >> > most
> >> > > > > cases,
> >> > > > > > > > >> session
> >> > > > > > > > >> > > > > tracking
> >> > > > > > > > >> > > > > > > is
> >> > > > > > > > >> > > > > > > >>>> event time based, not ingression time
> or
> >> > > > > processing
> >> > > > > > > > time
> >> > > > > > > > >> > > based,
> >> > > > > > > > >> > > > so
> >> > > > > > > > >> > > > > > > this
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>> may
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
> >> > happens,
> >> > > > the
> >> > > > > > > user
> >> > > > > > > > can
> >> > > > > > > > >> > > > always
> >> > > > > > > > >> > > > > > > alter
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>> the
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> default 500ms width.
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>> David
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad
> >> > Rozov <
> >> > > > > > > > >> > > > > > v.rozov@datatorrent.com
> >> > > > > > > > >> > > > > > > <javascript:;>>
> >> > > > > > > > >> > > > > > > >>>> wrote:
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>> Ability to send custom control tuples
> >> > within
> >> > > > > window
> >> > > > > > > > may be
> >> > > > > > > > >> > > > useful,
> >> > > > > > > > >> > > > > > for
> >> > > > > > > > >> > > > > > > >>>>> example, for sessions tracking, where
> >> > > session
> >> > > > > > > > boundaries
> >> > > > > > > > >> > are
> >> > > > > > > > >> > > > not
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>> aligned
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms
> latency
> >> > is
> >> > > > not
> >> > > > > > > > >> acceptable
> >> > > > > > > > >> > > for
> >> > > > > > > > >> > > > an
> >> > > > > > > > >> > > > > > > >>>>> application.
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>>> Thank you,
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>>> Vlad
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> >> > > > > > > > >> > > > > > > >>>>>
> >> > > > > > > > >> > > > > > > >>>>> It should not matter from where the
> >> > control
> >> > > > > tuple
> >> > > > > > is
> >> > > > > > > > >> > > triggered.
> >> > > > > > > > >> > > > > It
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> will
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> be
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> >> > > propagate
> >> > > > it
> >> > > > > > and
> >> > > > > > > > >> other
> >> > > > > > > > >> > > > things
> >> > > > > > > > >> > > > > > can
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> be
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
> >> > example,
> >> > > > the
> >> > > > > > new
> >> > > > > > > > >> > > > > comprehensive
> >> > > > > > > > >> > > > > > > >>>>>> support
> >> > > > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar,
> >> > > nothing
> >> > > > > that
> >> > > > > > > the
> >> > > > > > > > >> > engine
> >> > > > > > > > >> > > > > needs
> >> > > > > > > > >> > > > > > > to
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> know
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> about it except that we need the
> control
> >> > > tuple
> >> > > > > for
> >> > > > > > > > >> > watermark
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> propagation
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> and idempotent processing.
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> I also think the main difference to
> >> other
> >> > > > > tuples
> >> > > > > > is
> >> > > > > > > > the
> >> > > > > > > > >> > need
> >> > > > > > > > >> > > > to
> >> > > > > > > > >> > > > > > send
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> it
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> to
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> >> > > checkpoint
> >> > > > > > > window
> >> > > > > > > > >> > tuples,
> >> > > > > > > > >> > > > but
> >> > > > > > > > >> > > > > > not
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> the
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need the
> >> > > ability
> >> > > > > for
> >> > > > > > > the
> >> > > > > > > > >> user
> >> > > > > > > > >> > to
> >> > > > > > > > >> > > > > > control
> >> > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse
> the
> >> > > entire
> >> > > > > DAG
> >> > > > > > > or
> >> > > > > > > > >> not.
> >> > > > > > > > >> > > For
> >> > > > > > > > >> > > > a
> >> > > > > > > > >> > > > > > > batch
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> use
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to
> send
> >> the
> >> > > end
> >> > > > > of
> >> > > > > > > > file to
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > next
> >> > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the
> >> operator
> >> > > has
> >> > > > > > > > >> asynchronous
> >> > > > > > > > >> > > > > > > processing
> >> > > > > > > > >> > > > > > > >>>>>> logic
> >> > > > > > > > >> > > > > > > >>>>>> in it.
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the
> >> > control
> >> > > > > tuple
> >> > > > > > > > needs
> >> > > > > > > > >> to
> >> > > > > > > > >> > > be
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> processed
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> at
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the
> control
> >> > > tuple
> >> > > > > in
> >> > > > > > > the
> >> > > > > > > > >> > window
> >> > > > > > > > >> > > > > > callback
> >> > > > > > > > >> > > > > > > >>>>>> would
> >> > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state in
> >> the
> >> > > > > > operator.
> >> > > > > > > I
> >> > > > > > > > >> don't
> >> > > > > > > > >> > > > think
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> that's
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> a
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use case
> >> for
> >> > > > > > > processing a
> >> > > > > > > > >> > > control
> >> > > > > > > > >> > > > > > tuple
> >> > > > > > > > >> > > > > > > >>>>>> within
> >> > > > > > > > >> > > > > > > >>>>>> the window?
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> Thomas
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM,
> Pramod
> >> > > > > Immaneni
> >> > > > > > <
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>> pramod@datatorrent.com
> <javascript:;>>
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> wrote:
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I
> >> think
> >> > 1)
> >> > > > and
> >> > > > > > 2)
> >> > > > > > > > are
> >> > > > > > > > >> > more
> >> > > > > > > > >> > > > > likely
> >> > > > > > > > >> > > > > > > to
> >> > > > > > > > >> > > > > > > >>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
> >> > application,
> >> > > > 3)
> >> > > > > > and
> >> > > > > > > 4)
> >> > > > > > > > >> are
> >> > > > > > > > >> > > more
> >> > > > > > > > >> > > > > > > likely
> >> > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally
> and
> >> > > > directly
> >> > > > > > > > handled
> >> > > > > > > > >> by
> >> > > > > > > > >> > > the
> >> > > > > > > > >> > > > > > engine
> >> > > > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented
> >> that
> >> > > way
> >> > > > > > > > >> > > (apexcore-163).
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
> >> > operator
> >> > > > > would
> >> > > > > > be
> >> > > > > > > > sent
> >> > > > > > > > >> > to
> >> > > > > > > > >> > > > all
> >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and
> >> that
> >> > > > would
> >> > > > > be
> >> > > > > > > the
> >> > > > > > > > >> > chief
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> distinction
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> compared to data (apart from the
> payload)
> >> > > which
> >> > > > > > would
> >> > > > > > > > get
> >> > > > > > > > >> > > > > > partitioned
> >> > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It
> would
> >> > also
> >> > > be
> >> > > > > > > > guaranteed
> >> > > > > > > > >> > > that
> >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions will receive
> >> > control
> >> > > > > > tuples
> >> > > > > > > > only
> >> > > > > > > > >> > > after
> >> > > > > > > > >> > > > > the
> >> > > > > > > > >> > > > > > > data
> >> > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we could
> >> send
> >> > > it
> >> > > > > > > > immediately
> >> > > > > > > > >> > > when
> >> > > > > > > > >> > > > it
> >> > > > > > > > >> > > > > > is
> >> > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window
> >> boundaries.
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> However during unification it is
> >> > important
> >> > > > to
> >> > > > > > know
> >> > > > > > > > if
> >> > > > > > > > >> > these
> >> > > > > > > > >> > > > > > control
> >> > > > > > > > >> > > > > > > >>>>>>> tuples have been received from all
> >> > > upstream
> >> > > > > > > > partitions
> >> > > > > > > > >> > > before
> >> > > > > > > > >> > > > > > > >>>>>>> proceeding with a control
> operation.
> >> One
> >> > > > could
> >> > > > > > > wait
> >> > > > > > > > >> till
> >> > > > > > > > >> > > end
> >> > > > > > > > >> > > > of
> >> > > > > > > > >> > > > > > the
> >> > > > > > > > >> > > > > > > >>>>>>> window but that introduces a delay
> >> > however
> >> > > > > > small,
> >> > > > > > > I
> >> > > > > > > > >> would
> >> > > > > > > > >> > > > like
> >> > > > > > > > >> > > > > to
> >> > > > > > > > >> > > > > > > add
> >> > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform
> only
> >> > > hand
> >> > > > > over
> >> > > > > > > the
> >> > > > > > > > >> > > control
> >> > > > > > > > >> > > > > > tuple
> >> > > > > > > > >> > > > > > > to
> >> > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been
> received
> >> > from
> >> > > > all
> >> > > > > > > > upstream
> >> > > > > > > > >> > > > > > partitions
> >> > > > > > > > >> > > > > > > >>>>>>> much like how end window is
> processed
> >> > but
> >> > > > not
> >> > > > > > wait
> >> > > > > > > > till
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > > actual
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> end
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> of the window.
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about
> idempotency,
> >> we
> >> > > > > > typically
> >> > > > > > > > care
> >> > > > > > > > >> > > about
> >> > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and
> >> doing
> >> > > the
> >> > > > > > above
> >> > > > > > > > will
> >> > > > > > > > >> > > still
> >> > > > > > > > >> > > > > > allow
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> the
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> operators to preserve that easily.
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> Thanks
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David
> >> Yan
> >> > <
> >> > > > > > > > >> > > > david@datatorrent.com
> >> > > > > > > > >> > > > > > > <javascript:;>>
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>> wrote:
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> Hi all,
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new
> feature
> >> > to
> >> > > > the
> >> > > > > > Apex
> >> > > > > > > > core
> >> > > > > > > > >> > > > engine
> >> > > > > > > > >> > > > > --
> >> > > > > > > > >> > > > > > > the
> >> > > > > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
> >> > > > Currently,
> >> > > > > we
> >> > > > > > > > have
> >> > > > > > > > >> > > control
> >> > > > > > > > >> > > > > > > tuples
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> such
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> as
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW,
> CHECKPOINT,
> >> > and
> >> > > so
> >> > > > > on,
> >> > > > > > > > but we
> >> > > > > > > > >> > > don't
> >> > > > > > > > >> > > > > > have
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> the
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> support for applications to insert
> their
> >> > own
> >> > > > > > control
> >> > > > > > > > >> tuples.
> >> > > > > > > > >> > > The
> >> > > > > > > > >> > > > > way
> >> > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is to
> >> use
> >> > > data
> >> > > > > > > tuples
> >> > > > > > > > and
> >> > > > > > > > >> > > have
> >> > > > > > > > >> > > > a
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> separate
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> port
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples
> to
> >> all
> >> > > > > > > partitions
> >> > > > > > > > of
> >> > > > > > > > >> > the
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> downstream
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> operators, which is not exactly
> developer
> >> > > > > friendly.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of
> use
> >> > > cases
> >> > > > > that
> >> > > > > > > can
> >> > > > > > > > >> use
> >> > > > > > > > >> > > this
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> feature:
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all
> >> > > operators
> >> > > > > of
> >> > > > > > > the
> >> > > > > > > > >> > > physical
> >> > > > > > > > >> > > > > DAG
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> when
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> a
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the
> >> operators
> >> > > can
> >> > > > do
> >> > > > > > > > whatever
> >> > > > > > > > >> > > that
> >> > > > > > > > >> > > > is
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> needed
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> upon
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the
> concepts
> >> > of
> >> > > > > event
> >> > > > > > > time
> >> > > > > > > > >> > > > windowing,
> >> > > > > > > > >> > > > > > the
> >> > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed
> to
> >> > tell
> >> > > > > which
> >> > > > > > > > >> windows
> >> > > > > > > > >> > > > should
> >> > > > > > > > >> > > > > > be
> >> > > > > > > > >> > > > > > > >>>>>>>> considered late.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties:
> We
> >> do
> >> > > have
> >> > > > > the
> >> > > > > > > > >> support
> >> > > > > > > > >> > of
> >> > > > > > > > >> > > > > > > changing
> >> > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly,
> but
> >> > with
> >> > > a
> >> > > > > > custom
> >> > > > > > > > >> > control
> >> > > > > > > > >> > > > > tuple,
> >> > > > > > > > >> > > > > > > the
> >> > > > > > > > >> > > > > > > >>>>>>>> command to change operator
> properties
> >> > can
> >> > > > be
> >> > > > > > > window
> >> > > > > > > > >> > > aligned
> >> > > > > > > > >> > > > > for
> >> > > > > > > > >> > > > > > > all
> >> > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the
> DAG.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing
> >> > > operator
> >> > > > > > > > >> properties,
> >> > > > > > > > >> > we
> >> > > > > > > > >> > > > do
> >> > > > > > > > >> > > > > > have
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> this
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> support now but only at the
> individual
> >> > > > physical
> >> > > > > > > > operator
> >> > > > > > > > >> > > level,
> >> > > > > > > > >> > > > > and
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> without
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> control of which window to record
> >> tuples
> >> > > > for.
> >> > > > > > > With a
> >> > > > > > > > >> > custom
> >> > > > > > > > >> > > > > > control
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> tuple,
> >> > > > > > > > >> > > > > > > >>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must belong
> >> to a
> >> > > > > window,
> >> > > > > > > all
> >> > > > > > > > >> > > > operators
> >> > > > > > > > >> > > > > in
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> the
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> DAG
> >> > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for
> >> the
> >> > > same
> >> > > > > > > > windows.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to
> achieve
> >> > > this:
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type
> that
> >> > > takes
> >> > > > > > > user's
> >> > > > > > > > >> > > > > serializable
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> object.
> >> > > > > > > > >> > > > > > > >>>>
> >> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current
> BEGIN_WINDOW
> >> and
> >> > > > > > > END_WINDOW
> >> > > > > > > > >> > control
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>> tuples.
> >> > > > > > > > >> > > > > > > >>>
> >> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank
> you.
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>> David
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >>>>>>>>
> >> > > > > > > > >> > > > > > > >
> >> > > > > > > > >> > > > > > >
> >> > > > > > > > >> > > > > >
> >> > > > > > > > >> > > > >
> >> > > > > > > > >> > > >
> >> > > > > > > > >> > >
> >> > > > > > > > >> >
> >> > > > > > > > >>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>
>

Re: [DISCUSSION] Custom Control Tuples

Posted by Tushar Gosavi <tu...@datatorrent.com>.
I am also interested working on this feature.

- Tushar.


On Thu, Dec 1, 2016 at 10:27 AM, Bhupesh Chawda <bh...@datatorrent.com> wrote:
> I would like to work on https://issues.apache.org/jira/browse/APEXCORE-580.
>
> ~ Bhupesh
>
> On Thu, Dec 1, 2016 at 5:42 AM, Sandesh Hegde <sa...@datatorrent.com>
> wrote:
>
>> I am interested in working on the following subtask
>>
>> https://issues.apache.org/jira/browse/APEXCORE-581
>>
>> Thanks
>>
>>
>> On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com> wrote:
>>
>> > I have created an umbrella ticket for control tuple support:
>> >
>> > https://issues.apache.org/jira/browse/APEXCORE-579
>> >
>> > Currently it has two subtasks. Please have a look at them and see whether
>> > I'm missing anything or if you have anything to add. You are welcome to
>> add
>> > more subtasks or comment on the existing subtasks.
>> >
>> > We would like to kick start the implementation soon.
>> >
>> > Thanks!
>> >
>> > David
>> >
>> > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <bhupesh@datatorrent.com
>> >
>> > wrote:
>> >
>> > > +1 for the plan.
>> > >
>> > > I would be interested in contributing to this feature.
>> > >
>> > > ~ Bhupesh
>> > >
>> > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sa...@datatorrent.com>
>> wrote:
>> > >
>> > > > I am interested in contributing to this feature.
>> > > >
>> > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com>
>> > wrote:
>> > > >
>> > > > > I think we should probably go ahead with option 1 since this works
>> > with
>> > > > > most use cases and prevents developers from shooting themselves in
>> > the
>> > > > foot
>> > > > > in terms of idempotency.
>> > > > >
>> > > > > We can have a configuration property that enables option 2 later if
>> > we
>> > > > have
>> > > > > concrete use cases that call for it.
>> > > > >
>> > > > > Please share your thoughts if you think you don't agree with this
>> > plan.
>> > > > > Also, please indicate if you're interested in contributing to this
>> > > > feature.
>> > > > >
>> > > > > David
>> > > > >
>> > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
>> > > bhupesh@datatorrent.com
>> > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > It appears that option 1 is more favored due to unavailability
>> of a
>> > > use
>> > > > > > case which could use option 2.
>> > > > > >
>> > > > > > However, option 2 is problematic in specific cases, like presence
>> > of
>> > > > > > multiple input ports for example. In case of a linear DAG where
>> > > control
>> > > > > > tuples are flowing in order with the data tuples, it should not
>> be
>> > > > > > difficult to guarantee idempotency. For example, cases where
>> there
>> > > > could
>> > > > > be
>> > > > > > multiple changes in behavior of an operator during a single
>> window,
>> > > it
>> > > > > > should not wait until end window for these changes to take
>> effect.
>> > > > Since,
>> > > > > > we don't have a concrete use case right now, perhaps we do not
>> want
>> > > to
>> > > > go
>> > > > > > that road. This feature should be available through a platform
>> > > > attribute
>> > > > > > (may be at a later point in time) where the default is option 1.
>> > > > > >
>> > > > > > I think option 1 is suitable for a starting point in the
>> > > implementation
>> > > > > of
>> > > > > > this feature and we should proceed with it.
>> > > > > >
>> > > > > > ~ Bhupesh
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <
>> david@datatorrent.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Good question Tushar. The callback should be called only once.
>> > > > > > > The way to implement this is to keep a list of control tuple
>> > hashes
>> > > > for
>> > > > > > the
>> > > > > > > given streaming window and only do the callback when the
>> operator
>> > > has
>> > > > > not
>> > > > > > > seen it before.
>> > > > > > >
>> > > > > > > Other thoughts?
>> > > > > > >
>> > > > > > > David
>> > > > > > >
>> > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
>> > > > tushar@datatorrent.com
>> > > > > >
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi David,
>> > > > > > > >
>> > > > > > > > What would be the behaviour in case where we have a DAG with
>> > > > > following
>> > > > > > > > operators, the number in bracket is number of partitions, X
>> is
>> > > NxM
>> > > > > > > > partitioning.
>> > > > > > > > A(1) X B(4) X C(2)
>> > > > > > > >
>> > > > > > > > If A sends a control tuple, it will be sent to all 4
>> partition
>> > of
>> > > > B,
>> > > > > > > > and from each partition from B it goes to C, i.e each
>> partition
>> > > of
>> > > > C
>> > > > > > > > will receive same control tuple originated from A multiple
>> > times
>> > > > > > > > (number of upstream partitions of C). In this case will the
>> > > > callback
>> > > > > > > > function get called multiple times or just once.
>> > > > > > > >
>> > > > > > > > -Tushar.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
>> > > david@datatorrent.com>
>> > > > > > > wrote:
>> > > > > > > > > Hi Bhupesh,
>> > > > > > > > >
>> > > > > > > > > Since each input port has its own incoming control tuple, I
>> > > would
>> > > > > > > imagine
>> > > > > > > > > there would be an additional DefaultInputPort.
>> processControl
>> > > > method
>> > > > > > > that
>> > > > > > > > > operator developers can override.
>> > > > > > > > > If we go for option 1, my thinking is that the control
>> tuples
>> > > > would
>> > > > > > > > always
>> > > > > > > > > be delivered at the next window boundary, even if the emit
>> > > method
>> > > > > is
>> > > > > > > > called
>> > > > > > > > > within a window.
>> > > > > > > > >
>> > > > > > > > > David
>> > > > > > > > >
>> > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
>> > > > > > > bhupesh@datatorrent.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > >> I have a question regarding the callback for a control
>> > tuple.
>> > > > Will
>> > > > > > it
>> > > > > > > be
>> > > > > > > > >> similar to InputPort::process() method? Something like
>> > > > > > > > >> InputPort::processControlTuple(t)
>> > > > > > > > >> ? Or will it be a method of the operator similar to
>> > > > beginWindow()?
>> > > > > > > > >>
>> > > > > > > > >> When we say that the control tuple will be delivered at
>> > window
>> > > > > > > boundary,
>> > > > > > > > >> does that mean all control tuples emitted in that window
>> > will
>> > > be
>> > > > > > > > processed
>> > > > > > > > >> together at the end of the window? This would imply that
>> > there
>> > > > is
>> > > > > no
>> > > > > > > > >> ordering among regular tuples and control tuples.
>> > > > > > > > >>
>> > > > > > > > >> I think we should get started with the option 1 - control
>> > > tuples
>> > > > > at
>> > > > > > > > window
>> > > > > > > > >> boundary, which seems to handle most of the use cases. For
>> > > some
>> > > > > > cases
>> > > > > > > > which
>> > > > > > > > >> require option 2, we can always build on this.
>> > > > > > > > >>
>> > > > > > > > >> ~ Bhupesh
>> > > > > > > > >>
>> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
>> > thw@apache.org>
>> > > > > > wrote:
>> > > > > > > > >>
>> > > > > > > > >> > I don't see how that would work. Suppose you have a file
>> > > > > splitter
>> > > > > > > and
>> > > > > > > > >> > multiple partitions of block readers. The "end of file"
>> > > event
>> > > > > > cannot
>> > > > > > > > be
>> > > > > > > > >> > processed downstream until all block readers are done. I
>> > > also
>> > > > > > think
>> > > > > > > > that
>> > > > > > > > >> > this is related to the batch demarcation discussion and
>> > > there
>> > > > > > should
>> > > > > > > > be a
>> > > > > > > > >> > single generalized mechanism to support this.
>> > > > > > > > >> >
>> > > > > > > > >> >
>> > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
>> > > > > > > > pramod@datatorrent.com
>> > > > > > > > >> >
>> > > > > > > > >> > wrote:
>> > > > > > > > >> >
>> > > > > > > > >> > > Suppose I am processing data in a file and I want to
>> do
>> > > > > > something
>> > > > > > > at
>> > > > > > > > >> the
>> > > > > > > > >> > > end of a file at the output operator, I would send an
>> > end
>> > > > file
>> > > > > > > > control
>> > > > > > > > >> > > tuple and act on it when I receive it at the output.
>> In
>> > a
>> > > > > single
>> > > > > > > > >> window I
>> > > > > > > > >> > > may end up processing multiple files and if I don't
>> have
>> > > > > > multiple
>> > > > > > > > ports
>> > > > > > > > >> > and
>> > > > > > > > >> > > logical paths through the DAG (multiple partitions are
>> > > ok).
>> > > > I
>> > > > > > can
>> > > > > > > > >> process
>> > > > > > > > >> > > end of each file immediately and also know what file
>> was
>> > > > > closed
>> > > > > > > > without
>> > > > > > > > >> > > sending extra identification information in the end
>> file
>> > > > > which I
>> > > > > > > > would
>> > > > > > > > >> > need
>> > > > > > > > >> > > if I am collecting all of them and processing at the
>> end
>> > > of
>> > > > > the
>> > > > > > > > window.
>> > > > > > > > >> > >
>> > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
>> > > > thw@apache.org>
>> > > > > > > > wrote:
>> > > > > > > > >> > >
>> > > > > > > > >> > > > The use cases listed in the original discussion
>> don't
>> > > call
>> > > > > for
>> > > > > > > > option
>> > > > > > > > >> > 2.
>> > > > > > > > >> > > It
>> > > > > > > > >> > > > seems to come with additional complexity and
>> > > > implementation
>> > > > > > > cost.
>> > > > > > > > >> > > >
>> > > > > > > > >> > > > Can those in favor of option 2 please also provide
>> the
>> > > use
>> > > > > > case
>> > > > > > > > for
>> > > > > > > > >> it.
>> > > > > > > > >> > > >
>> > > > > > > > >> > > > Thanks,
>> > > > > > > > >> > > > Thomas
>> > > > > > > > >> > > >
>> > > > > > > > >> > > >
>> > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
>> > > > > > > > siyuan@datatorrent.com>
>> > > > > > > > >> > > > wrote:
>> > > > > > > > >> > > >
>> > > > > > > > >> > > > > I will vote for approach 1.
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > First of all that one sounds easier to do to me.
>> > And I
>> > > > > think
>> > > > > > > > >> > > idempotency
>> > > > > > > > >> > > > is
>> > > > > > > > >> > > > > important. It may run at the cost of higher
>> latency
>> > > but
>> > > > I
>> > > > > > > think
>> > > > > > > > it
>> > > > > > > > >> is
>> > > > > > > > >> > > ok
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > And in addition, when in the future if users do
>> need
>> > > > > > realtime
>> > > > > > > > >> control
>> > > > > > > > >> > > > tuple
>> > > > > > > > >> > > > > processing, we can always add the option on top of
>> > it.
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > So I vote for 1
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > Thanks,
>> > > > > > > > >> > > > > Siyuan
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
>> > > > > > > > prad@apache.org>
>> > > > > > > > >> > > > wrote:
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > > > > As a rule of thumb in any real time operating
>> > > system,
>> > > > > > > control
>> > > > > > > > >> > tuples
>> > > > > > > > >> > > > > should
>> > > > > > > > >> > > > > > always be handled using Priority Queues.
>> > > > > > > > >> > > > > >
>> > > > > > > > >> > > > > > We may try to control priorities by defining
>> > levels.
>> > > > And
>> > > > > > > shall
>> > > > > > > > >> not
>> > > > > > > > >> > > > > > be delivered at window boundaries.
>> > > > > > > > >> > > > > >
>> > > > > > > > >> > > > > > In short, control tuples shall never be treated
>> as
>> > > any
>> > > > > > other
>> > > > > > > > >> tuples
>> > > > > > > > >> > > in
>> > > > > > > > >> > > > > real
>> > > > > > > > >> > > > > > time systems.
>> > > > > > > > >> > > > > >
>> > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
>> > > > > > > > david@datatorrent.com>
>> > > > > > > > >> > > > wrote:
>> > > > > > > > >> > > > > >
>> > > > > > > > >> > > > > > > Hi all,
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > I would like to renew the discussion of
>> control
>> > > > > tuples.
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > Last time, we were in a debate about whether:
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > 1) the platform should enforce that control
>> > tuples
>> > > > are
>> > > > > > > > >> delivered
>> > > > > > > > >> > at
>> > > > > > > > >> > > > > > window
>> > > > > > > > >> > > > > > > boundaries only
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > or:
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > 2) the platform should deliver control tuples
>> > just
>> > > > as
>> > > > > > > other
>> > > > > > > > >> > tuples
>> > > > > > > > >> > > > and
>> > > > > > > > >> > > > > > it's
>> > > > > > > > >> > > > > > > the operator developers' choice whether to
>> > handle
>> > > > the
>> > > > > > > > control
>> > > > > > > > >> > > tuples
>> > > > > > > > >> > > > as
>> > > > > > > > >> > > > > > > they arrive or delay the processing till the
>> > next
>> > > > > window
>> > > > > > > > >> > boundary.
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > To summarize the pros and cons:
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > Approach 1: If processing control tuples
>> results
>> > > in
>> > > > > > > changes
>> > > > > > > > of
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > > behavior
>> > > > > > > > >> > > > > > > of the operator, if idempotency needs to be
>> > > > preserved,
>> > > > > > the
>> > > > > > > > >> > > processing
>> > > > > > > > >> > > > > > must
>> > > > > > > > >> > > > > > > be done at window boundaries. This approach
>> will
>> > > > save
>> > > > > > the
>> > > > > > > > >> > operator
>> > > > > > > > >> > > > > > > developers headache to ensure that. However,
>> > this
>> > > > will
>> > > > > > > take
>> > > > > > > > >> away
>> > > > > > > > >> > > the
>> > > > > > > > >> > > > > > > choices from the operator developer if they
>> just
>> > > > need
>> > > > > to
>> > > > > > > > >> process
>> > > > > > > > >> > > the
>> > > > > > > > >> > > > > > > control tuples as soon as possible.
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > Approach 2: The operator has a chance to
>> > > immediately
>> > > > > > > process
>> > > > > > > > >> > > control
>> > > > > > > > >> > > > > > > tuples. This would be useful if latency is
>> more
>> > > > valued
>> > > > > > > than
>> > > > > > > > >> > > > > correctness.
>> > > > > > > > >> > > > > > > However, if this would open the possibility
>> for
>> > > > > operator
>> > > > > > > > >> > developers
>> > > > > > > > >> > > > to
>> > > > > > > > >> > > > > > > shoot themselves in the foot. This is
>> especially
>> > > > true
>> > > > > if
>> > > > > > > > there
>> > > > > > > > >> > are
>> > > > > > > > >> > > > > > multiple
>> > > > > > > > >> > > > > > > input ports. as there is no easy way to
>> > guarantee
>> > > > > > > processing
>> > > > > > > > >> > order
>> > > > > > > > >> > > > for
>> > > > > > > > >> > > > > > > multiple input ports.
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > We would like to arrive to a consensus and
>> close
>> > > > this
>> > > > > > > > >> discussion
>> > > > > > > > >> > > soon
>> > > > > > > > >> > > > > > this
>> > > > > > > > >> > > > > > > time so we can start the work on this
>> important
>> > > > > feature.
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > Thanks!
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > David
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
>> > > > > > > > >> > > > v.rozov@datatorrent.com
>> > > > > > > > >> > > > > > > <javascript:;>>
>> > > > > > > > >> > > > > > > wrote:
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > > > > It is not clear how operator will emit
>> custom
>> > > > > control
>> > > > > > > > tuple
>> > > > > > > > >> at
>> > > > > > > > >> > > > window
>> > > > > > > > >> > > > > > > > boundaries. One way is to cache/accumulate
>> > > control
>> > > > > > > tuples
>> > > > > > > > in
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > > operator
>> > > > > > > > >> > > > > > > > output port till window closes (END_WINDOW
>> is
>> > > > > inserted
>> > > > > > > > into
>> > > > > > > > >> the
>> > > > > > > > >> > > > > output
>> > > > > > > > >> > > > > > > > sink) or only allow an operator to emit
>> > control
>> > > > > tuples
>> > > > > > > > inside
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > > > > endWindow(). The later is a slight variation
>> > of
>> > > > the
>> > > > > > > > operator
>> > > > > > > > >> > > output
>> > > > > > > > >> > > > > > port
>> > > > > > > > >> > > > > > > > caching behavior with the only difference
>> that
>> > > now
>> > > > > the
>> > > > > > > > >> operator
>> > > > > > > > >> > > > > itself
>> > > > > > > > >> > > > > > is
>> > > > > > > > >> > > > > > > > responsible for caching/accumulating control
>> > > > tuples.
>> > > > > > > Note
>> > > > > > > > >> that
>> > > > > > > > >> > in
>> > > > > > > > >> > > > > many
>> > > > > > > > >> > > > > > > > cases it will be necessary to postpone
>> > emitting
>> > > > > > payload
>> > > > > > > > >> tuples
>> > > > > > > > >> > > that
>> > > > > > > > >> > > > > > > > logically come after the custom control
>> tuple
>> > > till
>> > > > > the
>> > > > > > > > next
>> > > > > > > > >> > > window
>> > > > > > > > >> > > > > > > begins.
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case
>> where
>> > > > input
>> > > > > > > > operator
>> > > > > > > > >> > > uses a
>> > > > > > > > >> > > > > > push
>> > > > > > > > >> > > > > > > > instead of a poll (for example, it provides
>> an
>> > > end
>> > > > > > point
>> > > > > > > > >> where
>> > > > > > > > >> > > > remote
>> > > > > > > > >> > > > > > > > agents may connect and publish/push data),
>> > > control
>> > > > > > > tuples
>> > > > > > > > may
>> > > > > > > > >> > be
>> > > > > > > > >> > > > used
>> > > > > > > > >> > > > > > for
>> > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
>> > > > > > (partitioned)
>> > > > > > > > >> > > downstream
>> > > > > > > > >> > > > > > > > operators. In this case the platform just
>> need
>> > > to
>> > > > > > > > guarantee
>> > > > > > > > >> > order
>> > > > > > > > >> > > > > > barrier
>> > > > > > > > >> > > > > > > > (any tuple emitted prior to a control tuple
>> > > needs
>> > > > to
>> > > > > > be
>> > > > > > > > >> > delivered
>> > > > > > > > >> > > > > prior
>> > > > > > > > >> > > > > > > to
>> > > > > > > > >> > > > > > > > the control tuple).
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > > Thank you,
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > > Vlad
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > > >> I agree with David. Allowing control tuples
>> > > > within
>> > > > > a
>> > > > > > > > window
>> > > > > > > > >> > > (along
>> > > > > > > > >> > > > > > with
>> > > > > > > > >> > > > > > > >> data tuples) creates very dangerous
>> situation
>> > > > where
>> > > > > > > > >> guarantees
>> > > > > > > > >> > > are
>> > > > > > > > >> > > > > > > >> impacted. It is much safer to enable
>> control
>> > > > tuples
>> > > > > > > > >> > > (send/receive)
>> > > > > > > > >> > > > > at
>> > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of
>> window
>> > > N,
>> > > > > and
>> > > > > > > > before
>> > > > > > > > >> > > > > > BEGIN_WINDOW
>> > > > > > > > >> > > > > > > >> for window N+1). My take on David's list is
>> > > > > > > > >> > > > > > > >>
>> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there
>> > > will
>> > > > > be a
>> > > > > > > big
>> > > > > > > > >> > issue
>> > > > > > > > >> > > > with
>> > > > > > > > >> > > > > > > >> guarantees for operators with multiple
>> ports.
>> > > > (see
>> > > > > > > > Thomas's
>> > > > > > > > >> > > > > response)
>> > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but
>> there
>> > > are
>> > > > > > > > >> situations;
>> > > > > > > > >> > a
>> > > > > > > > >> > > > > caveat
>> > > > > > > > >> > > > > > > >> could be "only to operators that implement
>> > > > control
>> > > > > > > tuple
>> > > > > > > > >> > > > > > > >> interface/listeners", which could
>> effectively
>> > > > > > > translates
>> > > > > > > > to
>> > > > > > > > >> > "all
>> > > > > > > > >> > > > > > > >> interested
>> > > > > > > > >> > > > > > > >> downstream operators"
>> > > > > > > > >> > > > > > > >> 3. Only Input operator can create control
>> > > tuples
>> > > > ->
>> > > > > > -1;
>> > > > > > > > is
>> > > > > > > > >> > > > > restrictive
>> > > > > > > > >> > > > > > > >> even
>> > > > > > > > >> > > > > > > >> though most likely 95% of the time it will
>> be
>> > > > input
>> > > > > > > > >> operators
>> > > > > > > > >> > > > > > > >>
>> > > > > > > > >> > > > > > > >> Thks,
>> > > > > > > > >> > > > > > > >> Amol
>> > > > > > > > >> > > > > > > >>
>> > > > > > > > >> > > > > > > >>
>> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas
>> > Weise <
>> > > > > > > > >> > > > > thomas@datatorrent.com
>> > > > > > > > >> > > > > > > <javascript:;>>
>> > > > > > > > >> > > > > > > >> wrote:
>> > > > > > > > >> > > > > > > >>
>> > > > > > > > >> > > > > > > >> The windowing we discuss here is in general
>> > > event
>> > > > > > time
>> > > > > > > > >> based,
>> > > > > > > > >> > > > > arrival
>> > > > > > > > >> > > > > > > time
>> > > > > > > > >> > > > > > > >>> is a special case of it.
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>> I don't think state changes can be made
>> > > > > independent
>> > > > > > of
>> > > > > > > > the
>> > > > > > > > >> > > > > streaming
>> > > > > > > > >> > > > > > > >>> window
>> > > > > > > > >> > > > > > > >>> boundary as it would prevent idempotent
>> > > > processing
>> > > > > > and
>> > > > > > > > >> > > > transitively
>> > > > > > > > >> > > > > > > >>> exactly
>> > > > > > > > >> > > > > > > >>> once. For that to work, tuples need to be
>> > > > > presented
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > >> > > > operator
>> > > > > > > > >> > > > > > in
>> > > > > > > > >> > > > > > > a
>> > > > > > > > >> > > > > > > >>> guaranteed order *within* the streaming
>> > > window,
>> > > > > > which
>> > > > > > > is
>> > > > > > > > >> not
>> > > > > > > > >> > > > > possible
>> > > > > > > > >> > > > > > > >>> with
>> > > > > > > > >> > > > > > > >>> multiple ports (and partitions).
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>> Thomas
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David
>> Yan <
>> > > > > > > > >> > > > david@datatorrent.com
>> > > > > > > > >> > > > > > > <javascript:;>>
>> > > > > > > > >> > > > > > > >>> wrote:
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>> I think for session tracking, if the
>> session
>> > > > > > > boundaries
>> > > > > > > > are
>> > > > > > > > >> > > > allowed
>> > > > > > > > >> > > > > > to
>> > > > > > > > >> > > > > > > be
>> > > > > > > > >> > > > > > > >>>> not aligned with the streaming window
>> > > > boundaries,
>> > > > > > the
>> > > > > > > > user
>> > > > > > > > >> > > will
>> > > > > > > > >> > > > > > have a
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>> much
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And in
>> > most
>> > > > > cases,
>> > > > > > > > >> session
>> > > > > > > > >> > > > > tracking
>> > > > > > > > >> > > > > > > is
>> > > > > > > > >> > > > > > > >>>> event time based, not ingression time or
>> > > > > processing
>> > > > > > > > time
>> > > > > > > > >> > > based,
>> > > > > > > > >> > > > so
>> > > > > > > > >> > > > > > > this
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>> may
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
>> > happens,
>> > > > the
>> > > > > > > user
>> > > > > > > > can
>> > > > > > > > >> > > > always
>> > > > > > > > >> > > > > > > alter
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>> the
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> default 500ms width.
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>> David
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad
>> > Rozov <
>> > > > > > > > >> > > > > > v.rozov@datatorrent.com
>> > > > > > > > >> > > > > > > <javascript:;>>
>> > > > > > > > >> > > > > > > >>>> wrote:
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>> Ability to send custom control tuples
>> > within
>> > > > > window
>> > > > > > > > may be
>> > > > > > > > >> > > > useful,
>> > > > > > > > >> > > > > > for
>> > > > > > > > >> > > > > > > >>>>> example, for sessions tracking, where
>> > > session
>> > > > > > > > boundaries
>> > > > > > > > >> > are
>> > > > > > > > >> > > > not
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>> aligned
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency
>> > is
>> > > > not
>> > > > > > > > >> acceptable
>> > > > > > > > >> > > for
>> > > > > > > > >> > > > an
>> > > > > > > > >> > > > > > > >>>>> application.
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>>> Thank you,
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>>> Vlad
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
>> > > > > > > > >> > > > > > > >>>>>
>> > > > > > > > >> > > > > > > >>>>> It should not matter from where the
>> > control
>> > > > > tuple
>> > > > > > is
>> > > > > > > > >> > > triggered.
>> > > > > > > > >> > > > > It
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> will
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> be
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
>> > > propagate
>> > > > it
>> > > > > > and
>> > > > > > > > >> other
>> > > > > > > > >> > > > things
>> > > > > > > > >> > > > > > can
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> be
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
>> > example,
>> > > > the
>> > > > > > new
>> > > > > > > > >> > > > > comprehensive
>> > > > > > > > >> > > > > > > >>>>>> support
>> > > > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar,
>> > > nothing
>> > > > > that
>> > > > > > > the
>> > > > > > > > >> > engine
>> > > > > > > > >> > > > > needs
>> > > > > > > > >> > > > > > > to
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> know
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> about it except that we need the control
>> > > tuple
>> > > > > for
>> > > > > > > > >> > watermark
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> propagation
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> and idempotent processing.
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>> I also think the main difference to
>> other
>> > > > > tuples
>> > > > > > is
>> > > > > > > > the
>> > > > > > > > >> > need
>> > > > > > > > >> > > > to
>> > > > > > > > >> > > > > > send
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> it
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> to
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
>> > > checkpoint
>> > > > > > > window
>> > > > > > > > >> > tuples,
>> > > > > > > > >> > > > but
>> > > > > > > > >> > > > > > not
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> the
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need the
>> > > ability
>> > > > > for
>> > > > > > > the
>> > > > > > > > >> user
>> > > > > > > > >> > to
>> > > > > > > > >> > > > > > control
>> > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse the
>> > > entire
>> > > > > DAG
>> > > > > > > or
>> > > > > > > > >> not.
>> > > > > > > > >> > > For
>> > > > > > > > >> > > > a
>> > > > > > > > >> > > > > > > batch
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> use
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to send
>> the
>> > > end
>> > > > > of
>> > > > > > > > file to
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > next
>> > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the
>> operator
>> > > has
>> > > > > > > > >> asynchronous
>> > > > > > > > >> > > > > > > processing
>> > > > > > > > >> > > > > > > >>>>>> logic
>> > > > > > > > >> > > > > > > >>>>>> in it.
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the
>> > control
>> > > > > tuple
>> > > > > > > > needs
>> > > > > > > > >> to
>> > > > > > > > >> > > be
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> processed
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> at
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the control
>> > > tuple
>> > > > > in
>> > > > > > > the
>> > > > > > > > >> > window
>> > > > > > > > >> > > > > > callback
>> > > > > > > > >> > > > > > > >>>>>> would
>> > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state in
>> the
>> > > > > > operator.
>> > > > > > > I
>> > > > > > > > >> don't
>> > > > > > > > >> > > > think
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> that's
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> a
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use case
>> for
>> > > > > > > processing a
>> > > > > > > > >> > > control
>> > > > > > > > >> > > > > > tuple
>> > > > > > > > >> > > > > > > >>>>>> within
>> > > > > > > > >> > > > > > > >>>>>> the window?
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>> Thomas
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod
>> > > > > Immaneni
>> > > > > > <
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>> pramod@datatorrent.com <javascript:;>>
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> wrote:
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I
>> think
>> > 1)
>> > > > and
>> > > > > > 2)
>> > > > > > > > are
>> > > > > > > > >> > more
>> > > > > > > > >> > > > > likely
>> > > > > > > > >> > > > > > > to
>> > > > > > > > >> > > > > > > >>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
>> > application,
>> > > > 3)
>> > > > > > and
>> > > > > > > 4)
>> > > > > > > > >> are
>> > > > > > > > >> > > more
>> > > > > > > > >> > > > > > > likely
>> > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally and
>> > > > directly
>> > > > > > > > handled
>> > > > > > > > >> by
>> > > > > > > > >> > > the
>> > > > > > > > >> > > > > > engine
>> > > > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented
>> that
>> > > way
>> > > > > > > > >> > > (apexcore-163).
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
>> > operator
>> > > > > would
>> > > > > > be
>> > > > > > > > sent
>> > > > > > > > >> > to
>> > > > > > > > >> > > > all
>> > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and
>> that
>> > > > would
>> > > > > be
>> > > > > > > the
>> > > > > > > > >> > chief
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>> distinction
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> compared to data (apart from the payload)
>> > > which
>> > > > > > would
>> > > > > > > > get
>> > > > > > > > >> > > > > > partitioned
>> > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It would
>> > also
>> > > be
>> > > > > > > > guaranteed
>> > > > > > > > >> > > that
>> > > > > > > > >> > > > > > > >>>>>>> downstream partitions will receive
>> > control
>> > > > > > tuples
>> > > > > > > > only
>> > > > > > > > >> > > after
>> > > > > > > > >> > > > > the
>> > > > > > > > >> > > > > > > data
>> > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we could
>> send
>> > > it
>> > > > > > > > immediately
>> > > > > > > > >> > > when
>> > > > > > > > >> > > > it
>> > > > > > > > >> > > > > > is
>> > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window
>> boundaries.
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> However during unification it is
>> > important
>> > > > to
>> > > > > > know
>> > > > > > > > if
>> > > > > > > > >> > these
>> > > > > > > > >> > > > > > control
>> > > > > > > > >> > > > > > > >>>>>>> tuples have been received from all
>> > > upstream
>> > > > > > > > partitions
>> > > > > > > > >> > > before
>> > > > > > > > >> > > > > > > >>>>>>> proceeding with a control operation.
>> One
>> > > > could
>> > > > > > > wait
>> > > > > > > > >> till
>> > > > > > > > >> > > end
>> > > > > > > > >> > > > of
>> > > > > > > > >> > > > > > the
>> > > > > > > > >> > > > > > > >>>>>>> window but that introduces a delay
>> > however
>> > > > > > small,
>> > > > > > > I
>> > > > > > > > >> would
>> > > > > > > > >> > > > like
>> > > > > > > > >> > > > > to
>> > > > > > > > >> > > > > > > add
>> > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform only
>> > > hand
>> > > > > over
>> > > > > > > the
>> > > > > > > > >> > > control
>> > > > > > > > >> > > > > > tuple
>> > > > > > > > >> > > > > > > to
>> > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been received
>> > from
>> > > > all
>> > > > > > > > upstream
>> > > > > > > > >> > > > > > partitions
>> > > > > > > > >> > > > > > > >>>>>>> much like how end window is processed
>> > but
>> > > > not
>> > > > > > wait
>> > > > > > > > till
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > > actual
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>> end
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> of the window.
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency,
>> we
>> > > > > > typically
>> > > > > > > > care
>> > > > > > > > >> > > about
>> > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and
>> doing
>> > > the
>> > > > > > above
>> > > > > > > > will
>> > > > > > > > >> > > still
>> > > > > > > > >> > > > > > allow
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>> the
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> operators to preserve that easily.
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> Thanks
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David
>> Yan
>> > <
>> > > > > > > > >> > > > david@datatorrent.com
>> > > > > > > > >> > > > > > > <javascript:;>>
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>> wrote:
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> Hi all,
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature
>> > to
>> > > > the
>> > > > > > Apex
>> > > > > > > > core
>> > > > > > > > >> > > > engine
>> > > > > > > > >> > > > > --
>> > > > > > > > >> > > > > > > the
>> > > > > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
>> > > > Currently,
>> > > > > we
>> > > > > > > > have
>> > > > > > > > >> > > control
>> > > > > > > > >> > > > > > > tuples
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> such
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> as
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT,
>> > and
>> > > so
>> > > > > on,
>> > > > > > > > but we
>> > > > > > > > >> > > don't
>> > > > > > > > >> > > > > > have
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> the
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> support for applications to insert their
>> > own
>> > > > > > control
>> > > > > > > > >> tuples.
>> > > > > > > > >> > > The
>> > > > > > > > >> > > > > way
>> > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is to
>> use
>> > > data
>> > > > > > > tuples
>> > > > > > > > and
>> > > > > > > > >> > > have
>> > > > > > > > >> > > > a
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> separate
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> port
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to
>> all
>> > > > > > > partitions
>> > > > > > > > of
>> > > > > > > > >> > the
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> downstream
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> operators, which is not exactly developer
>> > > > > friendly.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of use
>> > > cases
>> > > > > that
>> > > > > > > can
>> > > > > > > > >> use
>> > > > > > > > >> > > this
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> feature:
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all
>> > > operators
>> > > > > of
>> > > > > > > the
>> > > > > > > > >> > > physical
>> > > > > > > > >> > > > > DAG
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> when
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> a
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the
>> operators
>> > > can
>> > > > do
>> > > > > > > > whatever
>> > > > > > > > >> > > that
>> > > > > > > > >> > > > is
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> needed
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> upon
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts
>> > of
>> > > > > event
>> > > > > > > time
>> > > > > > > > >> > > > windowing,
>> > > > > > > > >> > > > > > the
>> > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to
>> > tell
>> > > > > which
>> > > > > > > > >> windows
>> > > > > > > > >> > > > should
>> > > > > > > > >> > > > > > be
>> > > > > > > > >> > > > > > > >>>>>>>> considered late.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We
>> do
>> > > have
>> > > > > the
>> > > > > > > > >> support
>> > > > > > > > >> > of
>> > > > > > > > >> > > > > > > changing
>> > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but
>> > with
>> > > a
>> > > > > > custom
>> > > > > > > > >> > control
>> > > > > > > > >> > > > > tuple,
>> > > > > > > > >> > > > > > > the
>> > > > > > > > >> > > > > > > >>>>>>>> command to change operator properties
>> > can
>> > > > be
>> > > > > > > window
>> > > > > > > > >> > > aligned
>> > > > > > > > >> > > > > for
>> > > > > > > > >> > > > > > > all
>> > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing
>> > > operator
>> > > > > > > > >> properties,
>> > > > > > > > >> > we
>> > > > > > > > >> > > > do
>> > > > > > > > >> > > > > > have
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> this
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> support now but only at the individual
>> > > > physical
>> > > > > > > > operator
>> > > > > > > > >> > > level,
>> > > > > > > > >> > > > > and
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> without
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> control of which window to record
>> tuples
>> > > > for.
>> > > > > > > With a
>> > > > > > > > >> > custom
>> > > > > > > > >> > > > > > control
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> tuple,
>> > > > > > > > >> > > > > > > >>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must belong
>> to a
>> > > > > window,
>> > > > > > > all
>> > > > > > > > >> > > > operators
>> > > > > > > > >> > > > > in
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> the
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> DAG
>> > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for
>> the
>> > > same
>> > > > > > > > windows.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve
>> > > this:
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that
>> > > takes
>> > > > > > > user's
>> > > > > > > > >> > > > > serializable
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> object.
>> > > > > > > > >> > > > > > > >>>>
>> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW
>> and
>> > > > > > > END_WINDOW
>> > > > > > > > >> > control
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>> tuples.
>> > > > > > > > >> > > > > > > >>>
>> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank you.
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>> David
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >>>>>>>>
>> > > > > > > > >> > > > > > > >
>> > > > > > > > >> > > > > > >
>> > > > > > > > >> > > > > >
>> > > > > > > > >> > > > >
>> > > > > > > > >> > > >
>> > > > > > > > >> > >
>> > > > > > > > >> >
>> > > > > > > > >>
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>


Re: [DISCUSSION] Custom Control Tuples

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
I would like to work on https://issues.apache.org/jira/browse/APEXCORE-580.

~ Bhupesh

On Thu, Dec 1, 2016 at 5:42 AM, Sandesh Hegde <sa...@datatorrent.com>
wrote:

> I am interested in working on the following subtask
>
> https://issues.apache.org/jira/browse/APEXCORE-581
>
> Thanks
>
>
> On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com> wrote:
>
> > I have created an umbrella ticket for control tuple support:
> >
> > https://issues.apache.org/jira/browse/APEXCORE-579
> >
> > Currently it has two subtasks. Please have a look at them and see whether
> > I'm missing anything or if you have anything to add. You are welcome to
> add
> > more subtasks or comment on the existing subtasks.
> >
> > We would like to kick start the implementation soon.
> >
> > Thanks!
> >
> > David
> >
> > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <bhupesh@datatorrent.com
> >
> > wrote:
> >
> > > +1 for the plan.
> > >
> > > I would be interested in contributing to this feature.
> > >
> > > ~ Bhupesh
> > >
> > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sa...@datatorrent.com>
> wrote:
> > >
> > > > I am interested in contributing to this feature.
> > > >
> > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > I think we should probably go ahead with option 1 since this works
> > with
> > > > > most use cases and prevents developers from shooting themselves in
> > the
> > > > foot
> > > > > in terms of idempotency.
> > > > >
> > > > > We can have a configuration property that enables option 2 later if
> > we
> > > > have
> > > > > concrete use cases that call for it.
> > > > >
> > > > > Please share your thoughts if you think you don't agree with this
> > plan.
> > > > > Also, please indicate if you're interested in contributing to this
> > > > feature.
> > > > >
> > > > > David
> > > > >
> > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > It appears that option 1 is more favored due to unavailability
> of a
> > > use
> > > > > > case which could use option 2.
> > > > > >
> > > > > > However, option 2 is problematic in specific cases, like presence
> > of
> > > > > > multiple input ports for example. In case of a linear DAG where
> > > control
> > > > > > tuples are flowing in order with the data tuples, it should not
> be
> > > > > > difficult to guarantee idempotency. For example, cases where
> there
> > > > could
> > > > > be
> > > > > > multiple changes in behavior of an operator during a single
> window,
> > > it
> > > > > > should not wait until end window for these changes to take
> effect.
> > > > Since,
> > > > > > we don't have a concrete use case right now, perhaps we do not
> want
> > > to
> > > > go
> > > > > > that road. This feature should be available through a platform
> > > > attribute
> > > > > > (may be at a later point in time) where the default is option 1.
> > > > > >
> > > > > > I think option 1 is suitable for a starting point in the
> > > implementation
> > > > > of
> > > > > > this feature and we should proceed with it.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <
> david@datatorrent.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Good question Tushar. The callback should be called only once.
> > > > > > > The way to implement this is to keep a list of control tuple
> > hashes
> > > > for
> > > > > > the
> > > > > > > given streaming window and only do the callback when the
> operator
> > > has
> > > > > not
> > > > > > > seen it before.
> > > > > > >
> > > > > > > Other thoughts?
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> > > > tushar@datatorrent.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
> > > > > > > > What would be the behaviour in case where we have a DAG with
> > > > > following
> > > > > > > > operators, the number in bracket is number of partitions, X
> is
> > > NxM
> > > > > > > > partitioning.
> > > > > > > > A(1) X B(4) X C(2)
> > > > > > > >
> > > > > > > > If A sends a control tuple, it will be sent to all 4
> partition
> > of
> > > > B,
> > > > > > > > and from each partition from B it goes to C, i.e each
> partition
> > > of
> > > > C
> > > > > > > > will receive same control tuple originated from A multiple
> > times
> > > > > > > > (number of upstream partitions of C). In this case will the
> > > > callback
> > > > > > > > function get called multiple times or just once.
> > > > > > > >
> > > > > > > > -Tushar.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> > > david@datatorrent.com>
> > > > > > > wrote:
> > > > > > > > > Hi Bhupesh,
> > > > > > > > >
> > > > > > > > > Since each input port has its own incoming control tuple, I
> > > would
> > > > > > > imagine
> > > > > > > > > there would be an additional DefaultInputPort.
> processControl
> > > > method
> > > > > > > that
> > > > > > > > > operator developers can override.
> > > > > > > > > If we go for option 1, my thinking is that the control
> tuples
> > > > would
> > > > > > > > always
> > > > > > > > > be delivered at the next window boundary, even if the emit
> > > method
> > > > > is
> > > > > > > > called
> > > > > > > > > within a window.
> > > > > > > > >
> > > > > > > > > David
> > > > > > > > >
> > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > > > > > > bhupesh@datatorrent.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> I have a question regarding the callback for a control
> > tuple.
> > > > Will
> > > > > > it
> > > > > > > be
> > > > > > > > >> similar to InputPort::process() method? Something like
> > > > > > > > >> InputPort::processControlTuple(t)
> > > > > > > > >> ? Or will it be a method of the operator similar to
> > > > beginWindow()?
> > > > > > > > >>
> > > > > > > > >> When we say that the control tuple will be delivered at
> > window
> > > > > > > boundary,
> > > > > > > > >> does that mean all control tuples emitted in that window
> > will
> > > be
> > > > > > > > processed
> > > > > > > > >> together at the end of the window? This would imply that
> > there
> > > > is
> > > > > no
> > > > > > > > >> ordering among regular tuples and control tuples.
> > > > > > > > >>
> > > > > > > > >> I think we should get started with the option 1 - control
> > > tuples
> > > > > at
> > > > > > > > window
> > > > > > > > >> boundary, which seems to handle most of the use cases. For
> > > some
> > > > > > cases
> > > > > > > > which
> > > > > > > > >> require option 2, we can always build on this.
> > > > > > > > >>
> > > > > > > > >> ~ Bhupesh
> > > > > > > > >>
> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
> > thw@apache.org>
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > I don't see how that would work. Suppose you have a file
> > > > > splitter
> > > > > > > and
> > > > > > > > >> > multiple partitions of block readers. The "end of file"
> > > event
> > > > > > cannot
> > > > > > > > be
> > > > > > > > >> > processed downstream until all block readers are done. I
> > > also
> > > > > > think
> > > > > > > > that
> > > > > > > > >> > this is related to the batch demarcation discussion and
> > > there
> > > > > > should
> > > > > > > > be a
> > > > > > > > >> > single generalized mechanism to support this.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > > > > > > > pramod@datatorrent.com
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Suppose I am processing data in a file and I want to
> do
> > > > > > something
> > > > > > > at
> > > > > > > > >> the
> > > > > > > > >> > > end of a file at the output operator, I would send an
> > end
> > > > file
> > > > > > > > control
> > > > > > > > >> > > tuple and act on it when I receive it at the output.
> In
> > a
> > > > > single
> > > > > > > > >> window I
> > > > > > > > >> > > may end up processing multiple files and if I don't
> have
> > > > > > multiple
> > > > > > > > ports
> > > > > > > > >> > and
> > > > > > > > >> > > logical paths through the DAG (multiple partitions are
> > > ok).
> > > > I
> > > > > > can
> > > > > > > > >> process
> > > > > > > > >> > > end of each file immediately and also know what file
> was
> > > > > closed
> > > > > > > > without
> > > > > > > > >> > > sending extra identification information in the end
> file
> > > > > which I
> > > > > > > > would
> > > > > > > > >> > need
> > > > > > > > >> > > if I am collecting all of them and processing at the
> end
> > > of
> > > > > the
> > > > > > > > window.
> > > > > > > > >> > >
> > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> > > > thw@apache.org>
> > > > > > > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > The use cases listed in the original discussion
> don't
> > > call
> > > > > for
> > > > > > > > option
> > > > > > > > >> > 2.
> > > > > > > > >> > > It
> > > > > > > > >> > > > seems to come with additional complexity and
> > > > implementation
> > > > > > > cost.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Can those in favor of option 2 please also provide
> the
> > > use
> > > > > > case
> > > > > > > > for
> > > > > > > > >> it.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > > Thomas
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > > > > > > > siyuan@datatorrent.com>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > I will vote for approach 1.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > First of all that one sounds easier to do to me.
> > And I
> > > > > think
> > > > > > > > >> > > idempotency
> > > > > > > > >> > > > is
> > > > > > > > >> > > > > important. It may run at the cost of higher
> latency
> > > but
> > > > I
> > > > > > > think
> > > > > > > > it
> > > > > > > > >> is
> > > > > > > > >> > > ok
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > And in addition, when in the future if users do
> need
> > > > > > realtime
> > > > > > > > >> control
> > > > > > > > >> > > > tuple
> > > > > > > > >> > > > > processing, we can always add the option on top of
> > it.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > So I vote for 1
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Thanks,
> > > > > > > > >> > > > > Siyuan
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> > > > > > > > prad@apache.org>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > As a rule of thumb in any real time operating
> > > system,
> > > > > > > control
> > > > > > > > >> > tuples
> > > > > > > > >> > > > > should
> > > > > > > > >> > > > > > always be handled using Priority Queues.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > We may try to control priorities by defining
> > levels.
> > > > And
> > > > > > > shall
> > > > > > > > >> not
> > > > > > > > >> > > > > > be delivered at window boundaries.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > In short, control tuples shall never be treated
> as
> > > any
> > > > > > other
> > > > > > > > >> tuples
> > > > > > > > >> > > in
> > > > > > > > >> > > > > real
> > > > > > > > >> > > > > > time systems.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > > > > > > > david@datatorrent.com>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > > Hi all,
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > I would like to renew the discussion of
> control
> > > > > tuples.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Last time, we were in a debate about whether:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > 1) the platform should enforce that control
> > tuples
> > > > are
> > > > > > > > >> delivered
> > > > > > > > >> > at
> > > > > > > > >> > > > > > window
> > > > > > > > >> > > > > > > boundaries only
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > or:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > 2) the platform should deliver control tuples
> > just
> > > > as
> > > > > > > other
> > > > > > > > >> > tuples
> > > > > > > > >> > > > and
> > > > > > > > >> > > > > > it's
> > > > > > > > >> > > > > > > the operator developers' choice whether to
> > handle
> > > > the
> > > > > > > > control
> > > > > > > > >> > > tuples
> > > > > > > > >> > > > as
> > > > > > > > >> > > > > > > they arrive or delay the processing till the
> > next
> > > > > window
> > > > > > > > >> > boundary.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > To summarize the pros and cons:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Approach 1: If processing control tuples
> results
> > > in
> > > > > > > changes
> > > > > > > > of
> > > > > > > > >> > the
> > > > > > > > >> > > > > > behavior
> > > > > > > > >> > > > > > > of the operator, if idempotency needs to be
> > > > preserved,
> > > > > > the
> > > > > > > > >> > > processing
> > > > > > > > >> > > > > > must
> > > > > > > > >> > > > > > > be done at window boundaries. This approach
> will
> > > > save
> > > > > > the
> > > > > > > > >> > operator
> > > > > > > > >> > > > > > > developers headache to ensure that. However,
> > this
> > > > will
> > > > > > > take
> > > > > > > > >> away
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > > choices from the operator developer if they
> just
> > > > need
> > > > > to
> > > > > > > > >> process
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > > control tuples as soon as possible.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Approach 2: The operator has a chance to
> > > immediately
> > > > > > > process
> > > > > > > > >> > > control
> > > > > > > > >> > > > > > > tuples. This would be useful if latency is
> more
> > > > valued
> > > > > > > than
> > > > > > > > >> > > > > correctness.
> > > > > > > > >> > > > > > > However, if this would open the possibility
> for
> > > > > operator
> > > > > > > > >> > developers
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > > shoot themselves in the foot. This is
> especially
> > > > true
> > > > > if
> > > > > > > > there
> > > > > > > > >> > are
> > > > > > > > >> > > > > > multiple
> > > > > > > > >> > > > > > > input ports. as there is no easy way to
> > guarantee
> > > > > > > processing
> > > > > > > > >> > order
> > > > > > > > >> > > > for
> > > > > > > > >> > > > > > > multiple input ports.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > We would like to arrive to a consensus and
> close
> > > > this
> > > > > > > > >> discussion
> > > > > > > > >> > > soon
> > > > > > > > >> > > > > > this
> > > > > > > > >> > > > > > > time so we can start the work on this
> important
> > > > > feature.
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > Thanks!
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > David
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > > > > > > >> > > > v.rozov@datatorrent.com
> > > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > > >> > > > > > > wrote:
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > > > > It is not clear how operator will emit
> custom
> > > > > control
> > > > > > > > tuple
> > > > > > > > >> at
> > > > > > > > >> > > > window
> > > > > > > > >> > > > > > > > boundaries. One way is to cache/accumulate
> > > control
> > > > > > > tuples
> > > > > > > > in
> > > > > > > > >> > the
> > > > > > > > >> > > > > > operator
> > > > > > > > >> > > > > > > > output port till window closes (END_WINDOW
> is
> > > > > inserted
> > > > > > > > into
> > > > > > > > >> the
> > > > > > > > >> > > > > output
> > > > > > > > >> > > > > > > > sink) or only allow an operator to emit
> > control
> > > > > tuples
> > > > > > > > inside
> > > > > > > > >> > the
> > > > > > > > >> > > > > > > > endWindow(). The later is a slight variation
> > of
> > > > the
> > > > > > > > operator
> > > > > > > > >> > > output
> > > > > > > > >> > > > > > port
> > > > > > > > >> > > > > > > > caching behavior with the only difference
> that
> > > now
> > > > > the
> > > > > > > > >> operator
> > > > > > > > >> > > > > itself
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > > responsible for caching/accumulating control
> > > > tuples.
> > > > > > > Note
> > > > > > > > >> that
> > > > > > > > >> > in
> > > > > > > > >> > > > > many
> > > > > > > > >> > > > > > > > cases it will be necessary to postpone
> > emitting
> > > > > > payload
> > > > > > > > >> tuples
> > > > > > > > >> > > that
> > > > > > > > >> > > > > > > > logically come after the custom control
> tuple
> > > till
> > > > > the
> > > > > > > > next
> > > > > > > > >> > > window
> > > > > > > > >> > > > > > > begins.
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case
> where
> > > > input
> > > > > > > > operator
> > > > > > > > >> > > uses a
> > > > > > > > >> > > > > > push
> > > > > > > > >> > > > > > > > instead of a poll (for example, it provides
> an
> > > end
> > > > > > point
> > > > > > > > >> where
> > > > > > > > >> > > > remote
> > > > > > > > >> > > > > > > > agents may connect and publish/push data),
> > > control
> > > > > > > tuples
> > > > > > > > may
> > > > > > > > >> > be
> > > > > > > > >> > > > used
> > > > > > > > >> > > > > > for
> > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
> > > > > > (partitioned)
> > > > > > > > >> > > downstream
> > > > > > > > >> > > > > > > > operators. In this case the platform just
> need
> > > to
> > > > > > > > guarantee
> > > > > > > > >> > order
> > > > > > > > >> > > > > > barrier
> > > > > > > > >> > > > > > > > (any tuple emitted prior to a control tuple
> > > needs
> > > > to
> > > > > > be
> > > > > > > > >> > delivered
> > > > > > > > >> > > > > prior
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > > the control tuple).
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Thank you,
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > Vlad
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > > >> I agree with David. Allowing control tuples
> > > > within
> > > > > a
> > > > > > > > window
> > > > > > > > >> > > (along
> > > > > > > > >> > > > > > with
> > > > > > > > >> > > > > > > >> data tuples) creates very dangerous
> situation
> > > > where
> > > > > > > > >> guarantees
> > > > > > > > >> > > are
> > > > > > > > >> > > > > > > >> impacted. It is much safer to enable
> control
> > > > tuples
> > > > > > > > >> > > (send/receive)
> > > > > > > > >> > > > > at
> > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of
> window
> > > N,
> > > > > and
> > > > > > > > before
> > > > > > > > >> > > > > > BEGIN_WINDOW
> > > > > > > > >> > > > > > > >> for window N+1). My take on David's list is
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there
> > > will
> > > > > be a
> > > > > > > big
> > > > > > > > >> > issue
> > > > > > > > >> > > > with
> > > > > > > > >> > > > > > > >> guarantees for operators with multiple
> ports.
> > > > (see
> > > > > > > > Thomas's
> > > > > > > > >> > > > > response)
> > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but
> there
> > > are
> > > > > > > > >> situations;
> > > > > > > > >> > a
> > > > > > > > >> > > > > caveat
> > > > > > > > >> > > > > > > >> could be "only to operators that implement
> > > > control
> > > > > > > tuple
> > > > > > > > >> > > > > > > >> interface/listeners", which could
> effectively
> > > > > > > translates
> > > > > > > > to
> > > > > > > > >> > "all
> > > > > > > > >> > > > > > > >> interested
> > > > > > > > >> > > > > > > >> downstream operators"
> > > > > > > > >> > > > > > > >> 3. Only Input operator can create control
> > > tuples
> > > > ->
> > > > > > -1;
> > > > > > > > is
> > > > > > > > >> > > > > restrictive
> > > > > > > > >> > > > > > > >> even
> > > > > > > > >> > > > > > > >> though most likely 95% of the time it will
> be
> > > > input
> > > > > > > > >> operators
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >> Thks,
> > > > > > > > >> > > > > > > >> Amol
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas
> > Weise <
> > > > > > > > >> > > > > thomas@datatorrent.com
> > > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > > >> > > > > > > >> wrote:
> > > > > > > > >> > > > > > > >>
> > > > > > > > >> > > > > > > >> The windowing we discuss here is in general
> > > event
> > > > > > time
> > > > > > > > >> based,
> > > > > > > > >> > > > > arrival
> > > > > > > > >> > > > > > > time
> > > > > > > > >> > > > > > > >>> is a special case of it.
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>> I don't think state changes can be made
> > > > > independent
> > > > > > of
> > > > > > > > the
> > > > > > > > >> > > > > streaming
> > > > > > > > >> > > > > > > >>> window
> > > > > > > > >> > > > > > > >>> boundary as it would prevent idempotent
> > > > processing
> > > > > > and
> > > > > > > > >> > > > transitively
> > > > > > > > >> > > > > > > >>> exactly
> > > > > > > > >> > > > > > > >>> once. For that to work, tuples need to be
> > > > > presented
> > > > > > to
> > > > > > > > the
> > > > > > > > >> > > > operator
> > > > > > > > >> > > > > > in
> > > > > > > > >> > > > > > > a
> > > > > > > > >> > > > > > > >>> guaranteed order *within* the streaming
> > > window,
> > > > > > which
> > > > > > > is
> > > > > > > > >> not
> > > > > > > > >> > > > > possible
> > > > > > > > >> > > > > > > >>> with
> > > > > > > > >> > > > > > > >>> multiple ports (and partitions).
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>> Thomas
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David
> Yan <
> > > > > > > > >> > > > david@datatorrent.com
> > > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > > >> > > > > > > >>> wrote:
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>> I think for session tracking, if the
> session
> > > > > > > boundaries
> > > > > > > > are
> > > > > > > > >> > > > allowed
> > > > > > > > >> > > > > > to
> > > > > > > > >> > > > > > > be
> > > > > > > > >> > > > > > > >>>> not aligned with the streaming window
> > > > boundaries,
> > > > > > the
> > > > > > > > user
> > > > > > > > >> > > will
> > > > > > > > >> > > > > > have a
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>> much
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And in
> > most
> > > > > cases,
> > > > > > > > >> session
> > > > > > > > >> > > > > tracking
> > > > > > > > >> > > > > > > is
> > > > > > > > >> > > > > > > >>>> event time based, not ingression time or
> > > > > processing
> > > > > > > > time
> > > > > > > > >> > > based,
> > > > > > > > >> > > > so
> > > > > > > > >> > > > > > > this
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>> may
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
> > happens,
> > > > the
> > > > > > > user
> > > > > > > > can
> > > > > > > > >> > > > always
> > > > > > > > >> > > > > > > alter
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>> the
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> default 500ms width.
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>> David
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad
> > Rozov <
> > > > > > > > >> > > > > > v.rozov@datatorrent.com
> > > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > > >> > > > > > > >>>> wrote:
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>> Ability to send custom control tuples
> > within
> > > > > window
> > > > > > > > may be
> > > > > > > > >> > > > useful,
> > > > > > > > >> > > > > > for
> > > > > > > > >> > > > > > > >>>>> example, for sessions tracking, where
> > > session
> > > > > > > > boundaries
> > > > > > > > >> > are
> > > > > > > > >> > > > not
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>> aligned
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency
> > is
> > > > not
> > > > > > > > >> acceptable
> > > > > > > > >> > > for
> > > > > > > > >> > > > an
> > > > > > > > >> > > > > > > >>>>> application.
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>>> Thank you,
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>>> Vlad
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > > > > >> > > > > > > >>>>>
> > > > > > > > >> > > > > > > >>>>> It should not matter from where the
> > control
> > > > > tuple
> > > > > > is
> > > > > > > > >> > > triggered.
> > > > > > > > >> > > > > It
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> will
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> be
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> > > propagate
> > > > it
> > > > > > and
> > > > > > > > >> other
> > > > > > > > >> > > > things
> > > > > > > > >> > > > > > can
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> be
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
> > example,
> > > > the
> > > > > > new
> > > > > > > > >> > > > > comprehensive
> > > > > > > > >> > > > > > > >>>>>> support
> > > > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar,
> > > nothing
> > > > > that
> > > > > > > the
> > > > > > > > >> > engine
> > > > > > > > >> > > > > needs
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> know
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> about it except that we need the control
> > > tuple
> > > > > for
> > > > > > > > >> > watermark
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> propagation
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> and idempotent processing.
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>> I also think the main difference to
> other
> > > > > tuples
> > > > > > is
> > > > > > > > the
> > > > > > > > >> > need
> > > > > > > > >> > > > to
> > > > > > > > >> > > > > > send
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> it
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> to
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> > > checkpoint
> > > > > > > window
> > > > > > > > >> > tuples,
> > > > > > > > >> > > > but
> > > > > > > > >> > > > > > not
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> the
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need the
> > > ability
> > > > > for
> > > > > > > the
> > > > > > > > >> user
> > > > > > > > >> > to
> > > > > > > > >> > > > > > control
> > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse the
> > > entire
> > > > > DAG
> > > > > > > or
> > > > > > > > >> not.
> > > > > > > > >> > > For
> > > > > > > > >> > > > a
> > > > > > > > >> > > > > > > batch
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> use
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to send
> the
> > > end
> > > > > of
> > > > > > > > file to
> > > > > > > > >> > the
> > > > > > > > >> > > > > next
> > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the
> operator
> > > has
> > > > > > > > >> asynchronous
> > > > > > > > >> > > > > > > processing
> > > > > > > > >> > > > > > > >>>>>> logic
> > > > > > > > >> > > > > > > >>>>>> in it.
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the
> > control
> > > > > tuple
> > > > > > > > needs
> > > > > > > > >> to
> > > > > > > > >> > > be
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> processed
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> at
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the control
> > > tuple
> > > > > in
> > > > > > > the
> > > > > > > > >> > window
> > > > > > > > >> > > > > > callback
> > > > > > > > >> > > > > > > >>>>>> would
> > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state in
> the
> > > > > > operator.
> > > > > > > I
> > > > > > > > >> don't
> > > > > > > > >> > > > think
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> that's
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> a
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use case
> for
> > > > > > > processing a
> > > > > > > > >> > > control
> > > > > > > > >> > > > > > tuple
> > > > > > > > >> > > > > > > >>>>>> within
> > > > > > > > >> > > > > > > >>>>>> the window?
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>> Thomas
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod
> > > > > Immaneni
> > > > > > <
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>> pramod@datatorrent.com <javascript:;>>
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> wrote:
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I
> think
> > 1)
> > > > and
> > > > > > 2)
> > > > > > > > are
> > > > > > > > >> > more
> > > > > > > > >> > > > > likely
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>>>>>
> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
> > application,
> > > > 3)
> > > > > > and
> > > > > > > 4)
> > > > > > > > >> are
> > > > > > > > >> > > more
> > > > > > > > >> > > > > > > likely
> > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally and
> > > > directly
> > > > > > > > handled
> > > > > > > > >> by
> > > > > > > > >> > > the
> > > > > > > > >> > > > > > engine
> > > > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented
> that
> > > way
> > > > > > > > >> > > (apexcore-163).
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
> > operator
> > > > > would
> > > > > > be
> > > > > > > > sent
> > > > > > > > >> > to
> > > > > > > > >> > > > all
> > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and
> that
> > > > would
> > > > > be
> > > > > > > the
> > > > > > > > >> > chief
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>> distinction
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> compared to data (apart from the payload)
> > > which
> > > > > > would
> > > > > > > > get
> > > > > > > > >> > > > > > partitioned
> > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It would
> > also
> > > be
> > > > > > > > guaranteed
> > > > > > > > >> > > that
> > > > > > > > >> > > > > > > >>>>>>> downstream partitions will receive
> > control
> > > > > > tuples
> > > > > > > > only
> > > > > > > > >> > > after
> > > > > > > > >> > > > > the
> > > > > > > > >> > > > > > > data
> > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we could
> send
> > > it
> > > > > > > > immediately
> > > > > > > > >> > > when
> > > > > > > > >> > > > it
> > > > > > > > >> > > > > > is
> > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window
> boundaries.
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> However during unification it is
> > important
> > > > to
> > > > > > know
> > > > > > > > if
> > > > > > > > >> > these
> > > > > > > > >> > > > > > control
> > > > > > > > >> > > > > > > >>>>>>> tuples have been received from all
> > > upstream
> > > > > > > > partitions
> > > > > > > > >> > > before
> > > > > > > > >> > > > > > > >>>>>>> proceeding with a control operation.
> One
> > > > could
> > > > > > > wait
> > > > > > > > >> till
> > > > > > > > >> > > end
> > > > > > > > >> > > > of
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > >>>>>>> window but that introduces a delay
> > however
> > > > > > small,
> > > > > > > I
> > > > > > > > >> would
> > > > > > > > >> > > > like
> > > > > > > > >> > > > > to
> > > > > > > > >> > > > > > > add
> > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform only
> > > hand
> > > > > over
> > > > > > > the
> > > > > > > > >> > > control
> > > > > > > > >> > > > > > tuple
> > > > > > > > >> > > > > > > to
> > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been received
> > from
> > > > all
> > > > > > > > upstream
> > > > > > > > >> > > > > > partitions
> > > > > > > > >> > > > > > > >>>>>>> much like how end window is processed
> > but
> > > > not
> > > > > > wait
> > > > > > > > till
> > > > > > > > >> > the
> > > > > > > > >> > > > > > actual
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>> end
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> of the window.
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency,
> we
> > > > > > typically
> > > > > > > > care
> > > > > > > > >> > > about
> > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and
> doing
> > > the
> > > > > > above
> > > > > > > > will
> > > > > > > > >> > > still
> > > > > > > > >> > > > > > allow
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>> the
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> operators to preserve that easily.
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> Thanks
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David
> Yan
> > <
> > > > > > > > >> > > > david@datatorrent.com
> > > > > > > > >> > > > > > > <javascript:;>>
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>> wrote:
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> Hi all,
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature
> > to
> > > > the
> > > > > > Apex
> > > > > > > > core
> > > > > > > > >> > > > engine
> > > > > > > > >> > > > > --
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
> > > > Currently,
> > > > > we
> > > > > > > > have
> > > > > > > > >> > > control
> > > > > > > > >> > > > > > > tuples
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> such
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> as
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT,
> > and
> > > so
> > > > > on,
> > > > > > > > but we
> > > > > > > > >> > > don't
> > > > > > > > >> > > > > > have
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> the
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> support for applications to insert their
> > own
> > > > > > control
> > > > > > > > >> tuples.
> > > > > > > > >> > > The
> > > > > > > > >> > > > > way
> > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is to
> use
> > > data
> > > > > > > tuples
> > > > > > > > and
> > > > > > > > >> > > have
> > > > > > > > >> > > > a
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> separate
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> port
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to
> all
> > > > > > > partitions
> > > > > > > > of
> > > > > > > > >> > the
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> downstream
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> operators, which is not exactly developer
> > > > > friendly.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of use
> > > cases
> > > > > that
> > > > > > > can
> > > > > > > > >> use
> > > > > > > > >> > > this
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> feature:
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all
> > > operators
> > > > > of
> > > > > > > the
> > > > > > > > >> > > physical
> > > > > > > > >> > > > > DAG
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> when
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> a
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the
> operators
> > > can
> > > > do
> > > > > > > > whatever
> > > > > > > > >> > > that
> > > > > > > > >> > > > is
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> needed
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> upon
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts
> > of
> > > > > event
> > > > > > > time
> > > > > > > > >> > > > windowing,
> > > > > > > > >> > > > > > the
> > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to
> > tell
> > > > > which
> > > > > > > > >> windows
> > > > > > > > >> > > > should
> > > > > > > > >> > > > > > be
> > > > > > > > >> > > > > > > >>>>>>>> considered late.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We
> do
> > > have
> > > > > the
> > > > > > > > >> support
> > > > > > > > >> > of
> > > > > > > > >> > > > > > > changing
> > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but
> > with
> > > a
> > > > > > custom
> > > > > > > > >> > control
> > > > > > > > >> > > > > tuple,
> > > > > > > > >> > > > > > > the
> > > > > > > > >> > > > > > > >>>>>>>> command to change operator properties
> > can
> > > > be
> > > > > > > window
> > > > > > > > >> > > aligned
> > > > > > > > >> > > > > for
> > > > > > > > >> > > > > > > all
> > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing
> > > operator
> > > > > > > > >> properties,
> > > > > > > > >> > we
> > > > > > > > >> > > > do
> > > > > > > > >> > > > > > have
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> this
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> support now but only at the individual
> > > > physical
> > > > > > > > operator
> > > > > > > > >> > > level,
> > > > > > > > >> > > > > and
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> without
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> control of which window to record
> tuples
> > > > for.
> > > > > > > With a
> > > > > > > > >> > custom
> > > > > > > > >> > > > > > control
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> tuple,
> > > > > > > > >> > > > > > > >>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must belong
> to a
> > > > > window,
> > > > > > > all
> > > > > > > > >> > > > operators
> > > > > > > > >> > > > > in
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> the
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> DAG
> > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for
> the
> > > same
> > > > > > > > windows.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve
> > > this:
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that
> > > takes
> > > > > > > user's
> > > > > > > > >> > > > > serializable
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> object.
> > > > > > > > >> > > > > > > >>>>
> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW
> and
> > > > > > > END_WINDOW
> > > > > > > > >> > control
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>> tuples.
> > > > > > > > >> > > > > > > >>>
> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank you.
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>> David
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >>>>>>>>
> > > > > > > > >> > > > > > > >
> > > > > > > > >> > > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>