You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Pramod Immaneni <pr...@datatorrent.com> on 2017/02/15 01:34:57 UTC

Re: [DISCUSS] Custom Control Tuples Design

There have been some recent developments and discussions on the schema side
(link below) that warrant a reconsideration of how control tuples get
delivered.

http://apache.markmail.org/search/?q=apex+list%3Aorg.apache.apex.dev+schema+discovery+support#query:apex%20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid:oaji26y3xfozap5v+state:results

What I would like to suggest is that we allow two delivery options for
control tuples which can be configured on a per control tuple basis. First
is to deliver control tuple to the operator when the first instance of the
tuple arrives from any path. Second option is to deliver the control tuple
when the last instance of the tuple arrives from all the paths or at the
end window if it is going to be difficult to determine the last arrival.
The developer can choose the delivery option for the control tuple
preferably when the tuple is created. The first option will be useful for
scenarios like schema propagation or begin file in case of batch cases. The
second option will be useful for tuples like end file or end batch in batch
use cases.

Thanks

On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi All,
>
> Based on some discussion here is what is planned for the propagation
> feature for control tuples.
>
> The signature of the *processControl()* method in
> *ControlAwareDefaultInputPort* which is implemented by the operator
> developer will be as follows:
>
> *public abstract boolean processControl(UserDefinedControlTuple payload);*
>
> The boolean returned by the processControl() method indicates (to the
> engine) whether or not the operator is able to handle the control tuple and
> wants to take care of the propagation of the control tuple.
>
>    - If the method returns true - indicating it is able to handle the
>    control tuple, the operator has to explicitly emit the control tuples to
>    the output ports it wishes to propagate to.
>
>
>    - If the method returns false - indicating it is not able to handle the
>    control tuple, the control tuple will be propagated by the engine to all
>    output ports.
>
> The operator may even emit new control tuples in either of the cases.
> Note that for ports that are not control aware, the control tuple is
> propagated by default.
>
> We don't need any output port annotations or operator level attributes.
>
> ~ Bhupesh
>
>
> On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <v....@datatorrent.com>
> > wrote:
> > > +1 to manage propagation at an operator level. An operator is either
> > control
> > > tuple aware and needs to manage how control tuples are routed from
> input
> > > ports to output ports or it is not. In the later case it does not
> matter
> > how
> > > many input and output ports the operator has and it is the Apex
> platform
> > > responsibility to route control tuples. I don't see a use case where an
> > > operator that is not aware of a control tuple needs to manage one or
> more
> > > input ports (or similar output ports) differently than others.
> > >
> >
> > The problem with giving explicit control to operator for routing of
> > custom tuples is how does the operator
> > developer knows about control tuple requirement for downstream
> > operators in an application. For example in following DAG
> > A -> B -> C
> > A - is my custom source operator which emits a new control tuple type C1
> > and C.
> > B - is operator from malhar which handle control tuple C.
> > C - is custom output operator which handles C1.
> >
> > If B is managing control tuples, then it needs to remember to foward
> > unhandled tuples on all output port, else it will block
> > the tuples for downstream operator which might need them, also if new
> > output port is added then B needs to send that tuples
> > on the new output port also. But In this case I can't simply extend B
> > as port objects are transient and mostly anonymous,
> > I can not extend these to send control tuples on new output port. In
> > my opinion we should let the control tuple flow through
> > entire DAG from their source and let each operator in the path to
> > handle/ignore them as required without blocking them.
> >
> >
> > > In general, an operator is aware only of a specific control tuple(s)
> (for
> > > example end of batch or end of file) and for a control tuples that it
> was
> > > not enabled for, the behavior should be exactly the same as if the
> > operator
> > > is not control tuple aware, meaning that those control tuples should be
> > > propagated from input ports to output ports by the platform. There
> > should be
> > > an ability to let the platform know what control tuples an operator is
> > aware
> > > of and can handle. This can be done both by API call and an annotation.
> > >
> >
> > I think this will add overhead while developing applications. Operator
> > developer needs to add code to handle new control tuple also
> > need to update the part of code to register the type with engine. And
> > platfoms needs to perform type check and develiver the tuples
> > accordingly. Instead operator developer could check the type of
> > incoming tuple and handle it as required.
> >
> > - Tushar.
> >
> >
> > > Thank you,
> > >
> > > Vlad
> > >
> > >
> > > On 1/5/17 13:04, Bhupesh Chawda wrote:
> > >>
> > >> Agreed Thomas.
> > >> I was referring to the persona of the operator developer. The user of
> > the
> > >> operator would not be doing anything related to the propagation of
> > control
> > >> tuples. Actually, the behavior of the operator wrt. propagation of
> > control
> > >> tuples would be part of the operator documentation.
> > >>
> > >> Also, we are providing options for the developer to route the flow of
> > >> control tuples in code during the development of the operator. The
> > >> annotations would actually help achieve it in a easier way.
> > >>
> > >> ~ Bhupesh
> > >>
> > >> On Jan 5, 2017 21:40, "Thomas Weise" <th...@apache.org> wrote:
> > >>
> > >> I think it is important to be clear on the roles with regard to this
> > >> functionality. The user of the operator should not have to do anything
> > to
> > >> get it to work. So while I suggested to consider attributes earlier,
> > there
> > >> should not be any need for the user to set those. The operator needs
> to
> > >> work as is.
> > >>
> > >> The persona concerned with propagation of control tuples is the
> operator
> > >> developer. I think the clear way for the operator developer to
> override
> > >> the
> > >> propagation behavior is in code and if that is possible there is no
> need
> > >> for other things such as attributes or other port level settings.
> > >>
> > >> Thomas
> > >>
> > >>
> > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <
> > bhupesh@datatorrent.com>
> > >> wrote:
> > >>
> > >>> I think we all agree on the use case for selective propagation. The
> > >>> question is about where to have the control - at the operator level
> or
> > at
> > >>> the port level.
> > >>>
> > >>> For this ability, we have the following options:
> > >>>
> > >>>     1. Operator disables the propagation on selected output ports.
> > Other
> > >>>     output ports propagate by default.
> > >>>     2. Operator disables propagation for the entire operator (by
> means
> > of
> > >>
> > >> an
> > >>>
> > >>>     attribute). Operator developer explicitly emits the received
> > control
> > >>> tuples
> > >>>     on selected output ports.
> > >>>
> > >>> If the decision is to completely block the propagation, then Option 2
> > is
> > >>> easier to use as just an attribute needs to be set, as opposed to
> > Option
> > >>> 1
> > >>> where user needs to set the annotation on each output port.
> > >>>
> > >>> However, if selective propagation is needed, Option 1 would just need
> > the
> > >>> user to disable propagation on certain ports; rest are propagated by
> > >>> default, while Option 2 requires the user to explicitly emit the
> > control
> > >>> tuples.
> > >>> ~ Bhupesh
> > >>>
> > >>>
> > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <th...@apache.org> wrote:
> > >>>
> > >>>> Yes, I think that for any of these cases the operator developer will
> > >>
> > >> turn
> > >>>>
> > >>>> of implicit propagation for the operator and then write the code to
> > >>
> > >> route
> > >>>>
> > >>>> or create control tuples as needed.
> > >>>>
> > >>>> Thomas
> > >>>>
> > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <am...@datatorrent.com>
> > >>>
> > >>> wrote:
> > >>>>>
> > >>>>> I agree that by default the propagation must be implicit, i.e. if
> the
> > >>>>> operator does nothing, the control tuple propagates. I do think
> users
> > >>>>> should have control on deciding to "not propagate" or "create new"
> > and
> > >>>
> > >>> in
> > >>>>>
> > >>>>> these cases they would need to do something explicit (override)?
> > >>>>>
> > >>>>> The following cases come to mind
> > >>>>> 1. Sole consumer of a particular control signal (for example end of
> > >>>
> > >>> file)
> > >>>>>
> > >>>>> 2. Creator of a particular control signal (start of file, or a
> signal
> > >>>
> > >>> to
> > >>>>>
> > >>>>> pause on something etc.)
> > >>>>> 3. One port on a data pipeline and other port for meta-data
> pipeline
> > >>>>>
> > >>>>> In the above cases emit will be decided on an output port. #1 is
> only
> > >>>>
> > >>>> place
> > >>>>>
> > >>>>> where all output ports will disable the tuple, #2 and #3 most
> likely
> > >>>
> > >>> will
> > >>>>>
> > >>>>> be selective.
> > >>>>>
> > >>>>> Thks
> > >>>>> Amol
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <th...@apache.org>
> > wrote:
> > >>>>>
> > >>>>>> I think there is (1) implicit propagation just like other control
> > >>>>
> > >>>> tuples
> > >>>>>>
> > >>>>>> where the operator code isn't involved and (2) where the operator
> > >>>>>
> > >>>>> developer
> > >>>>>>
> > >>>>>> wants to decide how control tuples are created or routed and will
> > >>>>
> > >>>> receive
> > >>>>>>
> > >>>>>> and emit them on the output ports as desired.
> > >>>>>>
> > >>>>>> I don't see a use case for hybrid approaches? Maybe propagation
> does
> > >>>>
> > >>>> not
> > >>>>>>
> > >>>>>> need to be tied to ports at all, maybe just by annotation at the
> > >>>>
> > >>>> operator
> > >>>>>>
> > >>>>>> level?
> > >>>>>>
> > >>>>>> Thomas
> > >>>>>>
> > >>>>>>
> > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
> > >>>>
> > >>>> bhupesh@datatorrent.com
> > >>>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Wouldn't having this with output ports give a finer control on
> the
> > >>>>>>> propagation of control tuples?
> > >>>>>>> We might have an operator with two output ports each of which
> > >>>
> > >>> creates
> > >>>>>
> > >>>>> two
> > >>>>>>>
> > >>>>>>> different pipelines downstream. We would be able to say that one
> > >>>>>
> > >>>>> pipeline
> > >>>>>>>
> > >>>>>>> gets the control tuples and the other doesn't.
> > >>>>>>>
> > >>>>>>> ~ Bhupesh
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <th...@apache.org> wrote:
> > >>>>>>>
> > >>>>>>> I'm referring to the operator that needs to make the decision to
> > >>>>>>
> > >>>>>> propagate
> > >>>>>>>
> > >>>>>>> or not. The tuples come from an input port, so it seems
> > >>
> > >> appropriate
> > >>>>
> > >>>> to
> > >>>>>>
> > >>>>>> say
> > >>>>>>>
> > >>>>>>> "don't propagate control tuples from this port". No matter how
> > >>
> > >> many
> > >>>>>>
> > >>>>>> output
> > >>>>>>>
> > >>>>>>> ports there are.
> > >>>>>>>
> > >>>>>>> Output ports are there for an operator to emit new tuples, in the
> > >>>>
> > >>>> case
> > >>>>>>
> > >>>>>> you
> > >>>>>>>
> > >>>>>>> are discussing you don't emit new control tuples.
> > >>>>>>>
> > >>>>>>> Thomas
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
> > >>>>>
> > >>>>> bhupesh@datatorrent.com>
> > >>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Thomas,
> > >>>>>>>>
> > >>>>>>>> Are you suggesting an attribute on the input port for
> > >>
> > >> controlling
> > >>>>
> > >>>> the
> > >>>>>>>>
> > >>>>>>>> propagation of control tuples to downstream operators?
> > >>>>>>>> I think it should be better to do it on the output port since
> > >>
> > >> the
> > >>>>>>>
> > >>>>>>> decision
> > >>>>>>>>
> > >>>>>>>> to block the propagation will be made at the upstream operator
> > >>>>
> > >>>> rather
> > >>>>>>>
> > >>>>>>> than
> > >>>>>>>>
> > >>>>>>>> at the downstream.
> > >>>>>>>> Also, we need another way of controlling the propagation at run
> > >>>>
> > >>>> time
> > >>>>>>
> > >>>>>> and
> > >>>>>>>>
> > >>>>>>>> hence I was thinking about the method call on the output port,
> > >>
> > >> in
> > >>>>>>>
> > >>>>>>> addition
> > >>>>>>>>
> > >>>>>>>> to the annotation on the output port (which is the static way).
> > >>>>>>>>
> > >>>>>>>> Please correct me if I have misunderstood your question.
> > >>>>>>>>
> > >>>>>>>> ~ Bhupesh
> > >>>>>>>>
> > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <th...@apache.org>
> > >>>>
> > >>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Wouldn't it be more intuitive to control this with an
> > >>
> > >> attribute
> > >>>>
> > >>>> on
> > >>>>>>
> > >>>>>> the
> > >>>>>>>>>
> > >>>>>>>>> input port?
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> > >>>>>>>
> > >>>>>>> bhupesh@datatorrent.com
> > >>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Pramod,
> > >>>>>>>>>>
> > >>>>>>>>>> I was thinking of a method setPropagateControlTuples(boolean
> > >>>>>>>>
> > >>>>>>>> propagate)
> > >>>>>>>>>
> > >>>>>>>>> on
> > >>>>>>>>>>
> > >>>>>>>>>> the output port of the operator.
> > >>>>>>>>>> The operator could disable this in the code at any point of
> > >>>>
> > >>>> time.
> > >>>>>>>>>>
> > >>>>>>>>>> Note however that this is to block the propagation of
> > >>
> > >> control
> > >>>>>>
> > >>>>>> tuples
> > >>>>>>>>
> > >>>>>>>> from
> > >>>>>>>>>>
> > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the
> > >>>
> > >>> operator
> > >>>>>>
> > >>>>>> would
> > >>>>>>>>>
> > >>>>>>>>> still
> > >>>>>>>>>>
> > >>>>>>>>>> be emitted and sent to the downstream operators.
> > >>>>>>>>>>
> > >>>>>>>>>> Please see
> > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff-
> > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > >>>>>>>>>> in the PR.
> > >>>>>>>>>>
> > >>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > >>>>>>>>
> > >>>>>>>> pramod@datatorrent.com>
> > >>>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> 2 sounds good. Have you thought about what the method
> > >>
> > >> would
> > >>>>>
> > >>>>> look
> > >>>>>>>>
> > >>>>>>>> like.
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > >>>>>>>>>
> > >>>>>>>>> bhupesh@datatorrent.com
> > >>>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Yes, that makes sense.
> > >>>>>>>>>>>> We have following options:
> > >>>>>>>>>>>> 1. Make the annotation false by default and force the
> > >>>
> > >>> user
> > >>>>
> > >>>> to
> > >>>>>>>>
> > >>>>>>>> forward
> > >>>>>>>>>>
> > >>>>>>>>>> the
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> control tuples explicitly.
> > >>>>>>>>>>>> 2. Annotation is true by default and static way of
> > >>>
> > >>> blocking
> > >>>>>>
> > >>>>>> stays
> > >>>>>>>>
> > >>>>>>>> as
> > >>>>>>>>>
> > >>>>>>>>> it
> > >>>>>>>>>>>
> > >>>>>>>>>>> is.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> We provide another way for blocking programmatically,
> > >>>>
> > >>>> perhaps
> > >>>>>>
> > >>>>>> by
> > >>>>>>>>>
> > >>>>>>>>> means
> > >>>>>>>>>>
> > >>>>>>>>>> of
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> another method call on the port.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" <
> > >>>>>>
> > >>>>>> pramod@datatorrent.com
> > >>>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Bhupesh,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Annotation seems like a static way to stop
> > >>
> > >> propagation.
> > >>>>>
> > >>>>> Give
> > >>>>>>>>
> > >>>>>>>> these
> > >>>>>>>>>>
> > >>>>>>>>>> are
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> programmatically generated I would think the operators
> > >>>>>
> > >>>>> should
> > >>>>>>>
> > >>>>>>> be
> > >>>>>>>>>
> > >>>>>>>>> able
> > >>>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stop (consume without propagating) programmatically as
> > >>>>>
> > >>>>> well.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > >>>>>>>>>>>
> > >>>>>>>>>>> bhupesh@datatorrent.com
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you
> > >>>
> > >>> mentioned
> > >>>>>>>>
> > >>>>>>>> regarding
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> having
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> another interface which allows sinks to put a
> > >>
> > >> control
> > >>>>>>
> > >>>>>> tuple.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is
> > >>>
> > >>> what
> > >>>>
> > >>>> I
> > >>>>>
> > >>>>> am
> > >>>>>>>>>>
> > >>>>>>>>>> planning
> > >>>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> do:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> All the control tuples which are emitted in a
> > >>>>
> > >>>> particular
> > >>>>>>>
> > >>>>>>> window
> > >>>>>>>>>
> > >>>>>>>>> are
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> delivered after all the data tuples have been
> > >>>
> > >>> delivered
> > >>>>>
> > >>>>> to
> > >>>>>>>
> > >>>>>>> the
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> respective
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator
> > >>>>
> > >>>> can
> > >>>>>>
> > >>>>>> then
> > >>>>>>>>>>
> > >>>>>>>>>> process
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> control tuples in that window and can do any
> > >>>>
> > >>>> finalization
> > >>>>>>
> > >>>>>> in
> > >>>>>>>>
> > >>>>>>>> the
> > >>>>>>>>>>
> > >>>>>>>>>> end
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> window
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> call. There will be no delivery of control tuples
> > >>>
> > >>> after
> > >>>>>>>>>
> > >>>>>>>>> endWindow()
> > >>>>>>>>>>>
> > >>>>>>>>>>> and
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> before the next beginWindow() call.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For handling the propagation of control tuples
> > >>>
> > >>> further
> > >>>>
> > >>>> in
> > >>>>>>
> > >>>>>> the
> > >>>>>>>>>
> > >>>>>>>>> dag,
> > >>>>>>>>>>
> > >>>>>>>>>> we
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> are
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of
> > >>>>
> > >>>> the
> > >>>>>>>>
> > >>>>>>>> operator
> > >>>>>>>>>>>
> > >>>>>>>>>>> which
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> would be true by default.
> > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples =
> > >>>>>>
> > >>>>>> false).
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > >>>>>>>>>>
> > >>>>>>>>>> v.rozov@datatorrent.com
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted
> > >>
> > >> by
> > >>>>
> > >>>> an
> > >>>>>>>>
> > >>>>>>>> operator
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> itself
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of
> > >>>
> > >>> the
> > >>>>>>>
> > >>>>>>> custom
> > >>>>>>>>>>>
> > >>>>>>>>>>> control
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control
> > >>
> > >> tuples
> > >>>>>
> > >>>>> into
> > >>>>>>>>>
> > >>>>>>>>> various
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> sinks,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with
> > >>
> > >> the
> > >>>>>>>>>
> > >>>>>>>>> corresponding
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> type
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> prior to calling Sink.put().
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control
> > >>>
> > >>> tuple
> > >>>>>>
> > >>>>>> aware
> > >>>>>>>>>
> > >>>>>>>>> sinks
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> should
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> provide such functionality. In the case there is a
> > >>>>
> > >>>> lot
> > >>>>>
> > >>>>> of
> > >>>>>>>>
> > >>>>>>>> code
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> duplication,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> please create an abstract class, that other
> > >>
> > >> control
> > >>>>>
> > >>>>> aware
> > >>>>>>>>
> > >>>>>>>> sinks
> > >>>>>>>>>>>
> > >>>>>>>>>>> will
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> extend
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> from.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thank you,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Vlad
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Vlad,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping
> > >>>
> > >>> of
> > >>>>>
> > >>>>> the
> > >>>>>>>>
> > >>>>>>>> user
> > >>>>>>>>>>>
> > >>>>>>>>>>> tuple
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> control port. I was trying this out today.
> > >>>>>>>>>>>>>>>> The problem I see us if we introduce a
> > >>>>>
> > >>>>> putControlTuple()
> > >>>>>>>>>
> > >>>>>>>>> method
> > >>>>>>>>>>
> > >>>>>>>>>> in
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Sink,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change.
> > >>>
> > >>> Also
> > >>>>>
> > >>>>> the
> > >>>>>>>>>
> > >>>>>>>>> changes
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> seemed
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already
> > >>>>
> > >>>> use
> > >>>>>>
> > >>>>>> the
> > >>>>>>>>>
> > >>>>>>>>> put()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> method
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> sinks. So why do something special for custom
> > >>>>
> > >>>> control
> > >>>>>>>>
> > >>>>>>>> tuples?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> The only aspect in which the custom control
> > >>
> > >> tuples
> > >>>>
> > >>>> are
> > >>>>>>>>>
> > >>>>>>>>> different
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> these will be generated by the user and will
> > >>>>
> > >>>> actually
> > >>>>>
> > >>>>> be
> > >>>>>>>>>>
> > >>>>>>>>>> delivered
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be
> > >>>>
> > >>>> able
> > >>>>>
> > >>>>> to
> > >>>>>>>
> > >>>>>>> use
> > >>>>>>>>>
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem
> > >>
> > >> to
> > >>>>
> > >>>> be
> > >>>>>>>>>>>
> > >>>>>>>>>>> identification
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> user tuple as a control tuple.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> v.rozov@datatorrent.com
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort?
> > >>>
> > >>> Can't
> > >>>>>
> > >>>>> it
> > >>>>>>
> > >>>>>> be
> > >>>>>>>>>>>
> > >>>>>>>>>>> delegated
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thank you,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Vlad
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Vlad,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the
> > >>>>
> > >>>> wrapper
> > >>>>>
> > >>>>> is
> > >>>>>>>>
> > >>>>>>>> that
> > >>>>>>>>>>
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Ports
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the
> > >>>
> > >>> payload
> > >>>>>>
> > >>>>>> object
> > >>>>>>>>
> > >>>>>>>> of
> > >>>>>>>>>>
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> control
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of
> > >>>>
> > >>>> the
> > >>>>>>
> > >>>>>> API.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The output port will just get the payload of
> > >>
> > >> the
> > >>>>>
> > >>>>> user
> > >>>>>>>>>
> > >>>>>>>>> control
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> tuple.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control
> > >>>>>
> > >>>>> tuple,
> > >>>>>>>
> > >>>>>>> the
> > >>>>>>>>>>>
> > >>>>>>>>>>> payload
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> object
> > >>>>>>>>>>>>>>>>>> will just be a Long object.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some
> > >>>>>>>
> > >>>>>>> recognizable
> > >>>>>>>>>>>
> > >>>>>>>>>>> object
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> so
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a
> > >>>>>
> > >>>>> Control
> > >>>>>>>>
> > >>>>>>>> tuple
> > >>>>>>>>>>
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> not a
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It
> > >>>
> > >>> is
> > >>>>>>>>
> > >>>>>>>> therefore
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> necessary
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so
> > >>>
> > >>> that
> > >>>>>
> > >>>>> can
> > >>>>>>
> > >>>>>> be
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> distinguished
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> ControlTupleInterface.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Note
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is
> > >>>>>>
> > >>>>>> inserted
> > >>>>>>>>
> > >>>>>>>> into
> > >>>>>>>>>>
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> sink
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is
> > >>>>>>
> > >>>>>> inserted
> > >>>>>>>>
> > >>>>>>>> into
> > >>>>>>>>>>
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> sink,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple
> > >>
> > >> and
> > >>>>>>
> > >>>>>> cannot
> > >>>>>>>
> > >>>>>>> be
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> distinguished.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For this reason, I had something like the
> > >>>>
> > >>>> following
> > >>>>>
> > >>>>> in
> > >>>>>>>>
> > >>>>>>>> API:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> package com.datatorrent.api;
> > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface
> > >>>>>>>>>>>>>>>>>> {
> > >>>>>>>>>>>>>>>>>>      Object payload; // User control tuple
> > >>>>
> > >>>> payload. A
> > >>>>>>>>
> > >>>>>>>> Long()
> > >>>>>>>>>>
> > >>>>>>>>>> for
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> example.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>      UUID id;  // Unique Id to de-duplicate in
> > >>>>>>
> > >>>>>> downstream
> > >>>>>>>>>>>
> > >>>>>>>>>>> operators
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple
> > >>>
> > >>> class
> > >>>>>
> > >>>>> as
> > >>>>>>>
> > >>>>>>> the
> > >>>>>>>>>>>
> > >>>>>>>>>>> wrapper
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> for
> >
>

Re: [DISCUSS] Custom Control Tuples Design

Posted by Amol Kekre <am...@datatorrent.com>.
This is needed, the batch start-end have similar semantics as start-end
window from operational/functional perspective.

Thks
Amol


*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Tue, Feb 14, 2017 at 9:55 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> +1 for having an immediate delivery mechanism as well.
>
> I would suggest that the other delivery mechanism stays at end of window,
> to be consistent, as I think it may be difficult to determine the last
> arrival of the tuple.
>
> ~ Bhupesh
>
> On Wed, Feb 15, 2017 at 7:04 AM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > There have been some recent developments and discussions on the schema
> side
> > (link below) that warrant a reconsideration of how control tuples get
> > delivered.
> >
> > http://apache.markmail.org/search/?q=apex+list%3Aorg.
> > apache.apex.dev+schema+discovery+support#query:apex%
> > 20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid:
> > oaji26y3xfozap5v+state:results
> >
> > What I would like to suggest is that we allow two delivery options for
> > control tuples which can be configured on a per control tuple basis.
> First
> > is to deliver control tuple to the operator when the first instance of
> the
> > tuple arrives from any path. Second option is to deliver the control
> tuple
> > when the last instance of the tuple arrives from all the paths or at the
> > end window if it is going to be difficult to determine the last arrival.
> > The developer can choose the delivery option for the control tuple
> > preferably when the tuple is created. The first option will be useful for
> > scenarios like schema propagation or begin file in case of batch cases.
> The
> > second option will be useful for tuples like end file or end batch in
> batch
> > use cases.
> >
> > Thanks
> >
> > On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda <
> bhupesh@datatorrent.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > Based on some discussion here is what is planned for the propagation
> > > feature for control tuples.
> > >
> > > The signature of the *processControl()* method in
> > > *ControlAwareDefaultInputPort* which is implemented by the operator
> > > developer will be as follows:
> > >
> > > *public abstract boolean processControl(UserDefinedControlTuple
> > payload);*
> > >
> > > The boolean returned by the processControl() method indicates (to the
> > > engine) whether or not the operator is able to handle the control tuple
> > and
> > > wants to take care of the propagation of the control tuple.
> > >
> > >    - If the method returns true - indicating it is able to handle the
> > >    control tuple, the operator has to explicitly emit the control
> tuples
> > to
> > >    the output ports it wishes to propagate to.
> > >
> > >
> > >    - If the method returns false - indicating it is not able to handle
> > the
> > >    control tuple, the control tuple will be propagated by the engine to
> > all
> > >    output ports.
> > >
> > > The operator may even emit new control tuples in either of the cases.
> > > Note that for ports that are not control aware, the control tuple is
> > > propagated by default.
> > >
> > > We don't need any output port annotations or operator level attributes.
> > >
> > > ~ Bhupesh
> > >
> > >
> > > On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <tu...@datatorrent.com>
> > > wrote:
> > >
> > > > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <v.rozov@datatorrent.com
> >
> > > > wrote:
> > > > > +1 to manage propagation at an operator level. An operator is
> either
> > > > control
> > > > > tuple aware and needs to manage how control tuples are routed from
> > > input
> > > > > ports to output ports or it is not. In the later case it does not
> > > matter
> > > > how
> > > > > many input and output ports the operator has and it is the Apex
> > > platform
> > > > > responsibility to route control tuples. I don't see a use case
> where
> > an
> > > > > operator that is not aware of a control tuple needs to manage one
> or
> > > more
> > > > > input ports (or similar output ports) differently than others.
> > > > >
> > > >
> > > > The problem with giving explicit control to operator for routing of
> > > > custom tuples is how does the operator
> > > > developer knows about control tuple requirement for downstream
> > > > operators in an application. For example in following DAG
> > > > A -> B -> C
> > > > A - is my custom source operator which emits a new control tuple type
> > C1
> > > > and C.
> > > > B - is operator from malhar which handle control tuple C.
> > > > C - is custom output operator which handles C1.
> > > >
> > > > If B is managing control tuples, then it needs to remember to foward
> > > > unhandled tuples on all output port, else it will block
> > > > the tuples for downstream operator which might need them, also if new
> > > > output port is added then B needs to send that tuples
> > > > on the new output port also. But In this case I can't simply extend B
> > > > as port objects are transient and mostly anonymous,
> > > > I can not extend these to send control tuples on new output port. In
> > > > my opinion we should let the control tuple flow through
> > > > entire DAG from their source and let each operator in the path to
> > > > handle/ignore them as required without blocking them.
> > > >
> > > >
> > > > > In general, an operator is aware only of a specific control
> tuple(s)
> > > (for
> > > > > example end of batch or end of file) and for a control tuples that
> it
> > > was
> > > > > not enabled for, the behavior should be exactly the same as if the
> > > > operator
> > > > > is not control tuple aware, meaning that those control tuples
> should
> > be
> > > > > propagated from input ports to output ports by the platform. There
> > > > should be
> > > > > an ability to let the platform know what control tuples an operator
> > is
> > > > aware
> > > > > of and can handle. This can be done both by API call and an
> > annotation.
> > > > >
> > > >
> > > > I think this will add overhead while developing applications.
> Operator
> > > > developer needs to add code to handle new control tuple also
> > > > need to update the part of code to register the type with engine. And
> > > > platfoms needs to perform type check and develiver the tuples
> > > > accordingly. Instead operator developer could check the type of
> > > > incoming tuple and handle it as required.
> > > >
> > > > - Tushar.
> > > >
> > > >
> > > > > Thank you,
> > > > >
> > > > > Vlad
> > > > >
> > > > >
> > > > > On 1/5/17 13:04, Bhupesh Chawda wrote:
> > > > >>
> > > > >> Agreed Thomas.
> > > > >> I was referring to the persona of the operator developer. The user
> > of
> > > > the
> > > > >> operator would not be doing anything related to the propagation of
> > > > control
> > > > >> tuples. Actually, the behavior of the operator wrt. propagation of
> > > > control
> > > > >> tuples would be part of the operator documentation.
> > > > >>
> > > > >> Also, we are providing options for the developer to route the flow
> > of
> > > > >> control tuples in code during the development of the operator. The
> > > > >> annotations would actually help achieve it in a easier way.
> > > > >>
> > > > >> ~ Bhupesh
> > > > >>
> > > > >> On Jan 5, 2017 21:40, "Thomas Weise" <th...@apache.org> wrote:
> > > > >>
> > > > >> I think it is important to be clear on the roles with regard to
> this
> > > > >> functionality. The user of the operator should not have to do
> > anything
> > > > to
> > > > >> get it to work. So while I suggested to consider attributes
> earlier,
> > > > there
> > > > >> should not be any need for the user to set those. The operator
> needs
> > > to
> > > > >> work as is.
> > > > >>
> > > > >> The persona concerned with propagation of control tuples is the
> > > operator
> > > > >> developer. I think the clear way for the operator developer to
> > > override
> > > > >> the
> > > > >> propagation behavior is in code and if that is possible there is
> no
> > > need
> > > > >> for other things such as attributes or other port level settings.
> > > > >>
> > > > >> Thomas
> > > > >>
> > > > >>
> > > > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <
> > > > bhupesh@datatorrent.com>
> > > > >> wrote:
> > > > >>
> > > > >>> I think we all agree on the use case for selective propagation.
> The
> > > > >>> question is about where to have the control - at the operator
> level
> > > or
> > > > at
> > > > >>> the port level.
> > > > >>>
> > > > >>> For this ability, we have the following options:
> > > > >>>
> > > > >>>     1. Operator disables the propagation on selected output
> ports.
> > > > Other
> > > > >>>     output ports propagate by default.
> > > > >>>     2. Operator disables propagation for the entire operator (by
> > > means
> > > > of
> > > > >>
> > > > >> an
> > > > >>>
> > > > >>>     attribute). Operator developer explicitly emits the received
> > > > control
> > > > >>> tuples
> > > > >>>     on selected output ports.
> > > > >>>
> > > > >>> If the decision is to completely block the propagation, then
> > Option 2
> > > > is
> > > > >>> easier to use as just an attribute needs to be set, as opposed to
> > > > Option
> > > > >>> 1
> > > > >>> where user needs to set the annotation on each output port.
> > > > >>>
> > > > >>> However, if selective propagation is needed, Option 1 would just
> > need
> > > > the
> > > > >>> user to disable propagation on certain ports; rest are propagated
> > by
> > > > >>> default, while Option 2 requires the user to explicitly emit the
> > > > control
> > > > >>> tuples.
> > > > >>> ~ Bhupesh
> > > > >>>
> > > > >>>
> > > > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <th...@apache.org>
> > wrote:
> > > > >>>
> > > > >>>> Yes, I think that for any of these cases the operator developer
> > will
> > > > >>
> > > > >> turn
> > > > >>>>
> > > > >>>> of implicit propagation for the operator and then write the code
> > to
> > > > >>
> > > > >> route
> > > > >>>>
> > > > >>>> or create control tuples as needed.
> > > > >>>>
> > > > >>>> Thomas
> > > > >>>>
> > > > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <
> amol@datatorrent.com
> > >
> > > > >>>
> > > > >>> wrote:
> > > > >>>>>
> > > > >>>>> I agree that by default the propagation must be implicit, i.e.
> if
> > > the
> > > > >>>>> operator does nothing, the control tuple propagates. I do think
> > > users
> > > > >>>>> should have control on deciding to "not propagate" or "create
> > new"
> > > > and
> > > > >>>
> > > > >>> in
> > > > >>>>>
> > > > >>>>> these cases they would need to do something explicit
> (override)?
> > > > >>>>>
> > > > >>>>> The following cases come to mind
> > > > >>>>> 1. Sole consumer of a particular control signal (for example
> end
> > of
> > > > >>>
> > > > >>> file)
> > > > >>>>>
> > > > >>>>> 2. Creator of a particular control signal (start of file, or a
> > > signal
> > > > >>>
> > > > >>> to
> > > > >>>>>
> > > > >>>>> pause on something etc.)
> > > > >>>>> 3. One port on a data pipeline and other port for meta-data
> > > pipeline
> > > > >>>>>
> > > > >>>>> In the above cases emit will be decided on an output port. #1
> is
> > > only
> > > > >>>>
> > > > >>>> place
> > > > >>>>>
> > > > >>>>> where all output ports will disable the tuple, #2 and #3 most
> > > likely
> > > > >>>
> > > > >>> will
> > > > >>>>>
> > > > >>>>> be selective.
> > > > >>>>>
> > > > >>>>> Thks
> > > > >>>>> Amol
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <th...@apache.org>
> > > > wrote:
> > > > >>>>>
> > > > >>>>>> I think there is (1) implicit propagation just like other
> > control
> > > > >>>>
> > > > >>>> tuples
> > > > >>>>>>
> > > > >>>>>> where the operator code isn't involved and (2) where the
> > operator
> > > > >>>>>
> > > > >>>>> developer
> > > > >>>>>>
> > > > >>>>>> wants to decide how control tuples are created or routed and
> > will
> > > > >>>>
> > > > >>>> receive
> > > > >>>>>>
> > > > >>>>>> and emit them on the output ports as desired.
> > > > >>>>>>
> > > > >>>>>> I don't see a use case for hybrid approaches? Maybe
> propagation
> > > does
> > > > >>>>
> > > > >>>> not
> > > > >>>>>>
> > > > >>>>>> need to be tied to ports at all, maybe just by annotation at
> the
> > > > >>>>
> > > > >>>> operator
> > > > >>>>>>
> > > > >>>>>> level?
> > > > >>>>>>
> > > > >>>>>> Thomas
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
> > > > >>>>
> > > > >>>> bhupesh@datatorrent.com
> > > > >>>>>>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Wouldn't having this with output ports give a finer control
> on
> > > the
> > > > >>>>>>> propagation of control tuples?
> > > > >>>>>>> We might have an operator with two output ports each of which
> > > > >>>
> > > > >>> creates
> > > > >>>>>
> > > > >>>>> two
> > > > >>>>>>>
> > > > >>>>>>> different pipelines downstream. We would be able to say that
> > one
> > > > >>>>>
> > > > >>>>> pipeline
> > > > >>>>>>>
> > > > >>>>>>> gets the control tuples and the other doesn't.
> > > > >>>>>>>
> > > > >>>>>>> ~ Bhupesh
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <th...@apache.org>
> > wrote:
> > > > >>>>>>>
> > > > >>>>>>> I'm referring to the operator that needs to make the decision
> > to
> > > > >>>>>>
> > > > >>>>>> propagate
> > > > >>>>>>>
> > > > >>>>>>> or not. The tuples come from an input port, so it seems
> > > > >>
> > > > >> appropriate
> > > > >>>>
> > > > >>>> to
> > > > >>>>>>
> > > > >>>>>> say
> > > > >>>>>>>
> > > > >>>>>>> "don't propagate control tuples from this port". No matter
> how
> > > > >>
> > > > >> many
> > > > >>>>>>
> > > > >>>>>> output
> > > > >>>>>>>
> > > > >>>>>>> ports there are.
> > > > >>>>>>>
> > > > >>>>>>> Output ports are there for an operator to emit new tuples, in
> > the
> > > > >>>>
> > > > >>>> case
> > > > >>>>>>
> > > > >>>>>> you
> > > > >>>>>>>
> > > > >>>>>>> are discussing you don't emit new control tuples.
> > > > >>>>>>>
> > > > >>>>>>> Thomas
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
> > > > >>>>>
> > > > >>>>> bhupesh@datatorrent.com>
> > > > >>>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Thomas,
> > > > >>>>>>>>
> > > > >>>>>>>> Are you suggesting an attribute on the input port for
> > > > >>
> > > > >> controlling
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> propagation of control tuples to downstream operators?
> > > > >>>>>>>> I think it should be better to do it on the output port
> since
> > > > >>
> > > > >> the
> > > > >>>>>>>
> > > > >>>>>>> decision
> > > > >>>>>>>>
> > > > >>>>>>>> to block the propagation will be made at the upstream
> operator
> > > > >>>>
> > > > >>>> rather
> > > > >>>>>>>
> > > > >>>>>>> than
> > > > >>>>>>>>
> > > > >>>>>>>> at the downstream.
> > > > >>>>>>>> Also, we need another way of controlling the propagation at
> > run
> > > > >>>>
> > > > >>>> time
> > > > >>>>>>
> > > > >>>>>> and
> > > > >>>>>>>>
> > > > >>>>>>>> hence I was thinking about the method call on the output
> port,
> > > > >>
> > > > >> in
> > > > >>>>>>>
> > > > >>>>>>> addition
> > > > >>>>>>>>
> > > > >>>>>>>> to the annotation on the output port (which is the static
> > way).
> > > > >>>>>>>>
> > > > >>>>>>>> Please correct me if I have misunderstood your question.
> > > > >>>>>>>>
> > > > >>>>>>>> ~ Bhupesh
> > > > >>>>>>>>
> > > > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <
> thw@apache.org>
> > > > >>>>
> > > > >>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Wouldn't it be more intuitive to control this with an
> > > > >>
> > > > >> attribute
> > > > >>>>
> > > > >>>> on
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> input port?
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> > > > >>>>>>>
> > > > >>>>>>> bhupesh@datatorrent.com
> > > > >>>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Pramod,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I was thinking of a method setPropagateControlTuples(
> > boolean
> > > > >>>>>>>>
> > > > >>>>>>>> propagate)
> > > > >>>>>>>>>
> > > > >>>>>>>>> on
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the output port of the operator.
> > > > >>>>>>>>>> The operator could disable this in the code at any point
> of
> > > > >>>>
> > > > >>>> time.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Note however that this is to block the propagation of
> > > > >>
> > > > >> control
> > > > >>>>>>
> > > > >>>>>> tuples
> > > > >>>>>>>>
> > > > >>>>>>>> from
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the
> > > > >>>
> > > > >>> operator
> > > > >>>>>>
> > > > >>>>>> would
> > > > >>>>>>>>>
> > > > >>>>>>>>> still
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> be emitted and sent to the downstream operators.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Please see
> > > > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff-
> > > > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > > >>>>>>>>>> in the PR.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > > > >>>>>>>>
> > > > >>>>>>>> pramod@datatorrent.com>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> 2 sounds good. Have you thought about what the method
> > > > >>
> > > > >> would
> > > > >>>>>
> > > > >>>>> look
> > > > >>>>>>>>
> > > > >>>>>>>> like.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > > >>>>>>>>>
> > > > >>>>>>>>> bhupesh@datatorrent.com
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Yes, that makes sense.
> > > > >>>>>>>>>>>> We have following options:
> > > > >>>>>>>>>>>> 1. Make the annotation false by default and force the
> > > > >>>
> > > > >>> user
> > > > >>>>
> > > > >>>> to
> > > > >>>>>>>>
> > > > >>>>>>>> forward
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> control tuples explicitly.
> > > > >>>>>>>>>>>> 2. Annotation is true by default and static way of
> > > > >>>
> > > > >>> blocking
> > > > >>>>>>
> > > > >>>>>> stays
> > > > >>>>>>>>
> > > > >>>>>>>> as
> > > > >>>>>>>>>
> > > > >>>>>>>>> it
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> is.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> We provide another way for blocking programmatically,
> > > > >>>>
> > > > >>>> perhaps
> > > > >>>>>>
> > > > >>>>>> by
> > > > >>>>>>>>>
> > > > >>>>>>>>> means
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> another method call on the port.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" <
> > > > >>>>>>
> > > > >>>>>> pramod@datatorrent.com
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Bhupesh,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Annotation seems like a static way to stop
> > > > >>
> > > > >> propagation.
> > > > >>>>>
> > > > >>>>> Give
> > > > >>>>>>>>
> > > > >>>>>>>> these
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> are
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> programmatically generated I would think the operators
> > > > >>>>>
> > > > >>>>> should
> > > > >>>>>>>
> > > > >>>>>>> be
> > > > >>>>>>>>>
> > > > >>>>>>>>> able
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> stop (consume without propagating) programmatically as
> > > > >>>>>
> > > > >>>>> well.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> bhupesh@datatorrent.com
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you
> > > > >>>
> > > > >>> mentioned
> > > > >>>>>>>>
> > > > >>>>>>>> regarding
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> having
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> another interface which allows sinks to put a
> > > > >>
> > > > >> control
> > > > >>>>>>
> > > > >>>>>> tuple.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is
> > > > >>>
> > > > >>> what
> > > > >>>>
> > > > >>>> I
> > > > >>>>>
> > > > >>>>> am
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> planning
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> do:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> All the control tuples which are emitted in a
> > > > >>>>
> > > > >>>> particular
> > > > >>>>>>>
> > > > >>>>>>> window
> > > > >>>>>>>>>
> > > > >>>>>>>>> are
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> delivered after all the data tuples have been
> > > > >>>
> > > > >>> delivered
> > > > >>>>>
> > > > >>>>> to
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> respective
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator
> > > > >>>>
> > > > >>>> can
> > > > >>>>>>
> > > > >>>>>> then
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> process
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> control tuples in that window and can do any
> > > > >>>>
> > > > >>>> finalization
> > > > >>>>>>
> > > > >>>>>> in
> > > > >>>>>>>>
> > > > >>>>>>>> the
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> end
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> window
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> call. There will be no delivery of control tuples
> > > > >>>
> > > > >>> after
> > > > >>>>>>>>>
> > > > >>>>>>>>> endWindow()
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> before the next beginWindow() call.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> For handling the propagation of control tuples
> > > > >>>
> > > > >>> further
> > > > >>>>
> > > > >>>> in
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> dag,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> we
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> operator
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> would be true by default.
> > > > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples =
> > > > >>>>>>
> > > > >>>>>> false).
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> v.rozov@datatorrent.com
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted
> > > > >>
> > > > >> by
> > > > >>>>
> > > > >>>> an
> > > > >>>>>>>>
> > > > >>>>>>>> operator
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> itself
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of
> > > > >>>
> > > > >>> the
> > > > >>>>>>>
> > > > >>>>>>> custom
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> control
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control
> > > > >>
> > > > >> tuples
> > > > >>>>>
> > > > >>>>> into
> > > > >>>>>>>>>
> > > > >>>>>>>>> various
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> sinks,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> so
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with
> > > > >>
> > > > >> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> corresponding
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> type
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> prior to calling Sink.put().
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control
> > > > >>>
> > > > >>> tuple
> > > > >>>>>>
> > > > >>>>>> aware
> > > > >>>>>>>>>
> > > > >>>>>>>>> sinks
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> should
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> provide such functionality. In the case there is a
> > > > >>>>
> > > > >>>> lot
> > > > >>>>>
> > > > >>>>> of
> > > > >>>>>>>>
> > > > >>>>>>>> code
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> duplication,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> please create an abstract class, that other
> > > > >>
> > > > >> control
> > > > >>>>>
> > > > >>>>> aware
> > > > >>>>>>>>
> > > > >>>>>>>> sinks
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> extend
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> from.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thank you,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Vlad
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Vlad,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping
> > > > >>>
> > > > >>> of
> > > > >>>>>
> > > > >>>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> user
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> tuple
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> control port. I was trying this out today.
> > > > >>>>>>>>>>>>>>>> The problem I see us if we introduce a
> > > > >>>>>
> > > > >>>>> putControlTuple()
> > > > >>>>>>>>>
> > > > >>>>>>>>> method
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Sink,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change.
> > > > >>>
> > > > >>> Also
> > > > >>>>>
> > > > >>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> changes
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> seemed
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already
> > > > >>>>
> > > > >>>> use
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> put()
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> method
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> sinks. So why do something special for custom
> > > > >>>>
> > > > >>>> control
> > > > >>>>>>>>
> > > > >>>>>>>> tuples?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> The only aspect in which the custom control
> > > > >>
> > > > >> tuples
> > > > >>>>
> > > > >>>> are
> > > > >>>>>>>>>
> > > > >>>>>>>>> different
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> these will be generated by the user and will
> > > > >>>>
> > > > >>>> actually
> > > > >>>>>
> > > > >>>>> be
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> delivered
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be
> > > > >>>>
> > > > >>>> able
> > > > >>>>>
> > > > >>>>> to
> > > > >>>>>>>
> > > > >>>>>>> use
> > > > >>>>>>>>>
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> existing
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem
> > > > >>
> > > > >> to
> > > > >>>>
> > > > >>>> be
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> identification
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> user tuple as a control tuple.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> v.rozov@datatorrent.com
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort?
> > > > >>>
> > > > >>> Can't
> > > > >>>>>
> > > > >>>>> it
> > > > >>>>>>
> > > > >>>>>> be
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> delegated
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method?
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Thank you,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Vlad
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi Vlad,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the
> > > > >>>>
> > > > >>>> wrapper
> > > > >>>>>
> > > > >>>>> is
> > > > >>>>>>>>
> > > > >>>>>>>> that
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Ports
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the
> > > > >>>
> > > > >>> payload
> > > > >>>>>>
> > > > >>>>>> object
> > > > >>>>>>>>
> > > > >>>>>>>> of
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> control
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>
> > > > >>>>>> API.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> The output port will just get the payload of
> > > > >>
> > > > >> the
> > > > >>>>>
> > > > >>>>> user
> > > > >>>>>>>>>
> > > > >>>>>>>>> control
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> tuple.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> For
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control
> > > > >>>>>
> > > > >>>>> tuple,
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> payload
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> object
> > > > >>>>>>>>>>>>>>>>>> will just be a Long object.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some
> > > > >>>>>>>
> > > > >>>>>>> recognizable
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> object
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> so
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a
> > > > >>>>>
> > > > >>>>> Control
> > > > >>>>>>>>
> > > > >>>>>>>> tuple
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> not a
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It
> > > > >>>
> > > > >>> is
> > > > >>>>>>>>
> > > > >>>>>>>> therefore
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> necessary
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so
> > > > >>>
> > > > >>> that
> > > > >>>>>
> > > > >>>>> can
> > > > >>>>>>
> > > > >>>>>> be
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> distinguished
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> from
> > > > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ControlTupleInterface.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Note
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is
> > > > >>>>>>
> > > > >>>>>> inserted
> > > > >>>>>>>>
> > > > >>>>>>>> into
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> sink
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is
> > > > >>>>>>
> > > > >>>>>> inserted
> > > > >>>>>>>>
> > > > >>>>>>>> into
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> sink,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple
> > > > >>
> > > > >> and
> > > > >>>>>>
> > > > >>>>>> cannot
> > > > >>>>>>>
> > > > >>>>>>> be
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> distinguished.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> For this reason, I had something like the
> > > > >>>>
> > > > >>>> following
> > > > >>>>>
> > > > >>>>> in
> > > > >>>>>>>>
> > > > >>>>>>>> API:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> package com.datatorrent.api;
> > > > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface
> > > > >>>>>>>>>>>>>>>>>> {
> > > > >>>>>>>>>>>>>>>>>>      Object payload; // User control tuple
> > > > >>>>
> > > > >>>> payload. A
> > > > >>>>>>>>
> > > > >>>>>>>> Long()
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> for
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> example.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>      UUID id;  // Unique Id to de-duplicate in
> > > > >>>>>>
> > > > >>>>>> downstream
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> operators
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> }
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple
> > > > >>>
> > > > >>> class
> > > > >>>>>
> > > > >>>>> as
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> wrapper
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> for
> > > >
> > >
> >
>

Re: [DISCUSS] Custom Control Tuples Design

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
+1 for having an immediate delivery mechanism as well.

I would suggest that the other delivery mechanism stays at end of window,
to be consistent, as I think it may be difficult to determine the last
arrival of the tuple.

~ Bhupesh

On Wed, Feb 15, 2017 at 7:04 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> There have been some recent developments and discussions on the schema side
> (link below) that warrant a reconsideration of how control tuples get
> delivered.
>
> http://apache.markmail.org/search/?q=apex+list%3Aorg.
> apache.apex.dev+schema+discovery+support#query:apex%
> 20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid:
> oaji26y3xfozap5v+state:results
>
> What I would like to suggest is that we allow two delivery options for
> control tuples which can be configured on a per control tuple basis. First
> is to deliver control tuple to the operator when the first instance of the
> tuple arrives from any path. Second option is to deliver the control tuple
> when the last instance of the tuple arrives from all the paths or at the
> end window if it is going to be difficult to determine the last arrival.
> The developer can choose the delivery option for the control tuple
> preferably when the tuple is created. The first option will be useful for
> scenarios like schema propagation or begin file in case of batch cases. The
> second option will be useful for tuples like end file or end batch in batch
> use cases.
>
> Thanks
>
> On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > Based on some discussion here is what is planned for the propagation
> > feature for control tuples.
> >
> > The signature of the *processControl()* method in
> > *ControlAwareDefaultInputPort* which is implemented by the operator
> > developer will be as follows:
> >
> > *public abstract boolean processControl(UserDefinedControlTuple
> payload);*
> >
> > The boolean returned by the processControl() method indicates (to the
> > engine) whether or not the operator is able to handle the control tuple
> and
> > wants to take care of the propagation of the control tuple.
> >
> >    - If the method returns true - indicating it is able to handle the
> >    control tuple, the operator has to explicitly emit the control tuples
> to
> >    the output ports it wishes to propagate to.
> >
> >
> >    - If the method returns false - indicating it is not able to handle
> the
> >    control tuple, the control tuple will be propagated by the engine to
> all
> >    output ports.
> >
> > The operator may even emit new control tuples in either of the cases.
> > Note that for ports that are not control aware, the control tuple is
> > propagated by default.
> >
> > We don't need any output port annotations or operator level attributes.
> >
> > ~ Bhupesh
> >
> >
> > On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <tu...@datatorrent.com>
> > wrote:
> >
> > > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <v....@datatorrent.com>
> > > wrote:
> > > > +1 to manage propagation at an operator level. An operator is either
> > > control
> > > > tuple aware and needs to manage how control tuples are routed from
> > input
> > > > ports to output ports or it is not. In the later case it does not
> > matter
> > > how
> > > > many input and output ports the operator has and it is the Apex
> > platform
> > > > responsibility to route control tuples. I don't see a use case where
> an
> > > > operator that is not aware of a control tuple needs to manage one or
> > more
> > > > input ports (or similar output ports) differently than others.
> > > >
> > >
> > > The problem with giving explicit control to operator for routing of
> > > custom tuples is how does the operator
> > > developer knows about control tuple requirement for downstream
> > > operators in an application. For example in following DAG
> > > A -> B -> C
> > > A - is my custom source operator which emits a new control tuple type
> C1
> > > and C.
> > > B - is operator from malhar which handle control tuple C.
> > > C - is custom output operator which handles C1.
> > >
> > > If B is managing control tuples, then it needs to remember to foward
> > > unhandled tuples on all output port, else it will block
> > > the tuples for downstream operator which might need them, also if new
> > > output port is added then B needs to send that tuples
> > > on the new output port also. But In this case I can't simply extend B
> > > as port objects are transient and mostly anonymous,
> > > I can not extend these to send control tuples on new output port. In
> > > my opinion we should let the control tuple flow through
> > > entire DAG from their source and let each operator in the path to
> > > handle/ignore them as required without blocking them.
> > >
> > >
> > > > In general, an operator is aware only of a specific control tuple(s)
> > (for
> > > > example end of batch or end of file) and for a control tuples that it
> > was
> > > > not enabled for, the behavior should be exactly the same as if the
> > > operator
> > > > is not control tuple aware, meaning that those control tuples should
> be
> > > > propagated from input ports to output ports by the platform. There
> > > should be
> > > > an ability to let the platform know what control tuples an operator
> is
> > > aware
> > > > of and can handle. This can be done both by API call and an
> annotation.
> > > >
> > >
> > > I think this will add overhead while developing applications. Operator
> > > developer needs to add code to handle new control tuple also
> > > need to update the part of code to register the type with engine. And
> > > platfoms needs to perform type check and develiver the tuples
> > > accordingly. Instead operator developer could check the type of
> > > incoming tuple and handle it as required.
> > >
> > > - Tushar.
> > >
> > >
> > > > Thank you,
> > > >
> > > > Vlad
> > > >
> > > >
> > > > On 1/5/17 13:04, Bhupesh Chawda wrote:
> > > >>
> > > >> Agreed Thomas.
> > > >> I was referring to the persona of the operator developer. The user
> of
> > > the
> > > >> operator would not be doing anything related to the propagation of
> > > control
> > > >> tuples. Actually, the behavior of the operator wrt. propagation of
> > > control
> > > >> tuples would be part of the operator documentation.
> > > >>
> > > >> Also, we are providing options for the developer to route the flow
> of
> > > >> control tuples in code during the development of the operator. The
> > > >> annotations would actually help achieve it in a easier way.
> > > >>
> > > >> ~ Bhupesh
> > > >>
> > > >> On Jan 5, 2017 21:40, "Thomas Weise" <th...@apache.org> wrote:
> > > >>
> > > >> I think it is important to be clear on the roles with regard to this
> > > >> functionality. The user of the operator should not have to do
> anything
> > > to
> > > >> get it to work. So while I suggested to consider attributes earlier,
> > > there
> > > >> should not be any need for the user to set those. The operator needs
> > to
> > > >> work as is.
> > > >>
> > > >> The persona concerned with propagation of control tuples is the
> > operator
> > > >> developer. I think the clear way for the operator developer to
> > override
> > > >> the
> > > >> propagation behavior is in code and if that is possible there is no
> > need
> > > >> for other things such as attributes or other port level settings.
> > > >>
> > > >> Thomas
> > > >>
> > > >>
> > > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com>
> > > >> wrote:
> > > >>
> > > >>> I think we all agree on the use case for selective propagation. The
> > > >>> question is about where to have the control - at the operator level
> > or
> > > at
> > > >>> the port level.
> > > >>>
> > > >>> For this ability, we have the following options:
> > > >>>
> > > >>>     1. Operator disables the propagation on selected output ports.
> > > Other
> > > >>>     output ports propagate by default.
> > > >>>     2. Operator disables propagation for the entire operator (by
> > means
> > > of
> > > >>
> > > >> an
> > > >>>
> > > >>>     attribute). Operator developer explicitly emits the received
> > > control
> > > >>> tuples
> > > >>>     on selected output ports.
> > > >>>
> > > >>> If the decision is to completely block the propagation, then
> Option 2
> > > is
> > > >>> easier to use as just an attribute needs to be set, as opposed to
> > > Option
> > > >>> 1
> > > >>> where user needs to set the annotation on each output port.
> > > >>>
> > > >>> However, if selective propagation is needed, Option 1 would just
> need
> > > the
> > > >>> user to disable propagation on certain ports; rest are propagated
> by
> > > >>> default, while Option 2 requires the user to explicitly emit the
> > > control
> > > >>> tuples.
> > > >>> ~ Bhupesh
> > > >>>
> > > >>>
> > > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <th...@apache.org>
> wrote:
> > > >>>
> > > >>>> Yes, I think that for any of these cases the operator developer
> will
> > > >>
> > > >> turn
> > > >>>>
> > > >>>> of implicit propagation for the operator and then write the code
> to
> > > >>
> > > >> route
> > > >>>>
> > > >>>> or create control tuples as needed.
> > > >>>>
> > > >>>> Thomas
> > > >>>>
> > > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <amol@datatorrent.com
> >
> > > >>>
> > > >>> wrote:
> > > >>>>>
> > > >>>>> I agree that by default the propagation must be implicit, i.e. if
> > the
> > > >>>>> operator does nothing, the control tuple propagates. I do think
> > users
> > > >>>>> should have control on deciding to "not propagate" or "create
> new"
> > > and
> > > >>>
> > > >>> in
> > > >>>>>
> > > >>>>> these cases they would need to do something explicit (override)?
> > > >>>>>
> > > >>>>> The following cases come to mind
> > > >>>>> 1. Sole consumer of a particular control signal (for example end
> of
> > > >>>
> > > >>> file)
> > > >>>>>
> > > >>>>> 2. Creator of a particular control signal (start of file, or a
> > signal
> > > >>>
> > > >>> to
> > > >>>>>
> > > >>>>> pause on something etc.)
> > > >>>>> 3. One port on a data pipeline and other port for meta-data
> > pipeline
> > > >>>>>
> > > >>>>> In the above cases emit will be decided on an output port. #1 is
> > only
> > > >>>>
> > > >>>> place
> > > >>>>>
> > > >>>>> where all output ports will disable the tuple, #2 and #3 most
> > likely
> > > >>>
> > > >>> will
> > > >>>>>
> > > >>>>> be selective.
> > > >>>>>
> > > >>>>> Thks
> > > >>>>> Amol
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <th...@apache.org>
> > > wrote:
> > > >>>>>
> > > >>>>>> I think there is (1) implicit propagation just like other
> control
> > > >>>>
> > > >>>> tuples
> > > >>>>>>
> > > >>>>>> where the operator code isn't involved and (2) where the
> operator
> > > >>>>>
> > > >>>>> developer
> > > >>>>>>
> > > >>>>>> wants to decide how control tuples are created or routed and
> will
> > > >>>>
> > > >>>> receive
> > > >>>>>>
> > > >>>>>> and emit them on the output ports as desired.
> > > >>>>>>
> > > >>>>>> I don't see a use case for hybrid approaches? Maybe propagation
> > does
> > > >>>>
> > > >>>> not
> > > >>>>>>
> > > >>>>>> need to be tied to ports at all, maybe just by annotation at the
> > > >>>>
> > > >>>> operator
> > > >>>>>>
> > > >>>>>> level?
> > > >>>>>>
> > > >>>>>> Thomas
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
> > > >>>>
> > > >>>> bhupesh@datatorrent.com
> > > >>>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Wouldn't having this with output ports give a finer control on
> > the
> > > >>>>>>> propagation of control tuples?
> > > >>>>>>> We might have an operator with two output ports each of which
> > > >>>
> > > >>> creates
> > > >>>>>
> > > >>>>> two
> > > >>>>>>>
> > > >>>>>>> different pipelines downstream. We would be able to say that
> one
> > > >>>>>
> > > >>>>> pipeline
> > > >>>>>>>
> > > >>>>>>> gets the control tuples and the other doesn't.
> > > >>>>>>>
> > > >>>>>>> ~ Bhupesh
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <th...@apache.org>
> wrote:
> > > >>>>>>>
> > > >>>>>>> I'm referring to the operator that needs to make the decision
> to
> > > >>>>>>
> > > >>>>>> propagate
> > > >>>>>>>
> > > >>>>>>> or not. The tuples come from an input port, so it seems
> > > >>
> > > >> appropriate
> > > >>>>
> > > >>>> to
> > > >>>>>>
> > > >>>>>> say
> > > >>>>>>>
> > > >>>>>>> "don't propagate control tuples from this port". No matter how
> > > >>
> > > >> many
> > > >>>>>>
> > > >>>>>> output
> > > >>>>>>>
> > > >>>>>>> ports there are.
> > > >>>>>>>
> > > >>>>>>> Output ports are there for an operator to emit new tuples, in
> the
> > > >>>>
> > > >>>> case
> > > >>>>>>
> > > >>>>>> you
> > > >>>>>>>
> > > >>>>>>> are discussing you don't emit new control tuples.
> > > >>>>>>>
> > > >>>>>>> Thomas
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
> > > >>>>>
> > > >>>>> bhupesh@datatorrent.com>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Thomas,
> > > >>>>>>>>
> > > >>>>>>>> Are you suggesting an attribute on the input port for
> > > >>
> > > >> controlling
> > > >>>>
> > > >>>> the
> > > >>>>>>>>
> > > >>>>>>>> propagation of control tuples to downstream operators?
> > > >>>>>>>> I think it should be better to do it on the output port since
> > > >>
> > > >> the
> > > >>>>>>>
> > > >>>>>>> decision
> > > >>>>>>>>
> > > >>>>>>>> to block the propagation will be made at the upstream operator
> > > >>>>
> > > >>>> rather
> > > >>>>>>>
> > > >>>>>>> than
> > > >>>>>>>>
> > > >>>>>>>> at the downstream.
> > > >>>>>>>> Also, we need another way of controlling the propagation at
> run
> > > >>>>
> > > >>>> time
> > > >>>>>>
> > > >>>>>> and
> > > >>>>>>>>
> > > >>>>>>>> hence I was thinking about the method call on the output port,
> > > >>
> > > >> in
> > > >>>>>>>
> > > >>>>>>> addition
> > > >>>>>>>>
> > > >>>>>>>> to the annotation on the output port (which is the static
> way).
> > > >>>>>>>>
> > > >>>>>>>> Please correct me if I have misunderstood your question.
> > > >>>>>>>>
> > > >>>>>>>> ~ Bhupesh
> > > >>>>>>>>
> > > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <th...@apache.org>
> > > >>>>
> > > >>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Wouldn't it be more intuitive to control this with an
> > > >>
> > > >> attribute
> > > >>>>
> > > >>>> on
> > > >>>>>>
> > > >>>>>> the
> > > >>>>>>>>>
> > > >>>>>>>>> input port?
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> > > >>>>>>>
> > > >>>>>>> bhupesh@datatorrent.com
> > > >>>>>>>>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi Pramod,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I was thinking of a method setPropagateControlTuples(
> boolean
> > > >>>>>>>>
> > > >>>>>>>> propagate)
> > > >>>>>>>>>
> > > >>>>>>>>> on
> > > >>>>>>>>>>
> > > >>>>>>>>>> the output port of the operator.
> > > >>>>>>>>>> The operator could disable this in the code at any point of
> > > >>>>
> > > >>>> time.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Note however that this is to block the propagation of
> > > >>
> > > >> control
> > > >>>>>>
> > > >>>>>> tuples
> > > >>>>>>>>
> > > >>>>>>>> from
> > > >>>>>>>>>>
> > > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the
> > > >>>
> > > >>> operator
> > > >>>>>>
> > > >>>>>> would
> > > >>>>>>>>>
> > > >>>>>>>>> still
> > > >>>>>>>>>>
> > > >>>>>>>>>> be emitted and sent to the downstream operators.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Please see
> > > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff-
> > > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > >>>>>>>>>> in the PR.
> > > >>>>>>>>>>
> > > >>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > > >>>>>>>>
> > > >>>>>>>> pramod@datatorrent.com>
> > > >>>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> 2 sounds good. Have you thought about what the method
> > > >>
> > > >> would
> > > >>>>>
> > > >>>>> look
> > > >>>>>>>>
> > > >>>>>>>> like.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > >>>>>>>>>
> > > >>>>>>>>> bhupesh@datatorrent.com
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Yes, that makes sense.
> > > >>>>>>>>>>>> We have following options:
> > > >>>>>>>>>>>> 1. Make the annotation false by default and force the
> > > >>>
> > > >>> user
> > > >>>>
> > > >>>> to
> > > >>>>>>>>
> > > >>>>>>>> forward
> > > >>>>>>>>>>
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> control tuples explicitly.
> > > >>>>>>>>>>>> 2. Annotation is true by default and static way of
> > > >>>
> > > >>> blocking
> > > >>>>>>
> > > >>>>>> stays
> > > >>>>>>>>
> > > >>>>>>>> as
> > > >>>>>>>>>
> > > >>>>>>>>> it
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> is.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> We provide another way for blocking programmatically,
> > > >>>>
> > > >>>> perhaps
> > > >>>>>>
> > > >>>>>> by
> > > >>>>>>>>>
> > > >>>>>>>>> means
> > > >>>>>>>>>>
> > > >>>>>>>>>> of
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> another method call on the port.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" <
> > > >>>>>>
> > > >>>>>> pramod@datatorrent.com
> > > >>>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Bhupesh,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Annotation seems like a static way to stop
> > > >>
> > > >> propagation.
> > > >>>>>
> > > >>>>> Give
> > > >>>>>>>>
> > > >>>>>>>> these
> > > >>>>>>>>>>
> > > >>>>>>>>>> are
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> programmatically generated I would think the operators
> > > >>>>>
> > > >>>>> should
> > > >>>>>>>
> > > >>>>>>> be
> > > >>>>>>>>>
> > > >>>>>>>>> able
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> stop (consume without propagating) programmatically as
> > > >>>>>
> > > >>>>> well.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> bhupesh@datatorrent.com
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you
> > > >>>
> > > >>> mentioned
> > > >>>>>>>>
> > > >>>>>>>> regarding
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> having
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> another interface which allows sinks to put a
> > > >>
> > > >> control
> > > >>>>>>
> > > >>>>>> tuple.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is
> > > >>>
> > > >>> what
> > > >>>>
> > > >>>> I
> > > >>>>>
> > > >>>>> am
> > > >>>>>>>>>>
> > > >>>>>>>>>> planning
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> do:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> All the control tuples which are emitted in a
> > > >>>>
> > > >>>> particular
> > > >>>>>>>
> > > >>>>>>> window
> > > >>>>>>>>>
> > > >>>>>>>>> are
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> delivered after all the data tuples have been
> > > >>>
> > > >>> delivered
> > > >>>>>
> > > >>>>> to
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> respective
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator
> > > >>>>
> > > >>>> can
> > > >>>>>>
> > > >>>>>> then
> > > >>>>>>>>>>
> > > >>>>>>>>>> process
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> control tuples in that window and can do any
> > > >>>>
> > > >>>> finalization
> > > >>>>>>
> > > >>>>>> in
> > > >>>>>>>>
> > > >>>>>>>> the
> > > >>>>>>>>>>
> > > >>>>>>>>>> end
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> window
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> call. There will be no delivery of control tuples
> > > >>>
> > > >>> after
> > > >>>>>>>>>
> > > >>>>>>>>> endWindow()
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> before the next beginWindow() call.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> For handling the propagation of control tuples
> > > >>>
> > > >>> further
> > > >>>>
> > > >>>> in
> > > >>>>>>
> > > >>>>>> the
> > > >>>>>>>>>
> > > >>>>>>>>> dag,
> > > >>>>>>>>>>
> > > >>>>>>>>>> we
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of
> > > >>>>
> > > >>>> the
> > > >>>>>>>>
> > > >>>>>>>> operator
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> which
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> would be true by default.
> > > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples =
> > > >>>>>>
> > > >>>>>> false).
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > >>>>>>>>>>
> > > >>>>>>>>>> v.rozov@datatorrent.com
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted
> > > >>
> > > >> by
> > > >>>>
> > > >>>> an
> > > >>>>>>>>
> > > >>>>>>>> operator
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> itself
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of
> > > >>>
> > > >>> the
> > > >>>>>>>
> > > >>>>>>> custom
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> control
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control
> > > >>
> > > >> tuples
> > > >>>>>
> > > >>>>> into
> > > >>>>>>>>>
> > > >>>>>>>>> various
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> sinks,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> so
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with
> > > >>
> > > >> the
> > > >>>>>>>>>
> > > >>>>>>>>> corresponding
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> type
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> prior to calling Sink.put().
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control
> > > >>>
> > > >>> tuple
> > > >>>>>>
> > > >>>>>> aware
> > > >>>>>>>>>
> > > >>>>>>>>> sinks
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> provide such functionality. In the case there is a
> > > >>>>
> > > >>>> lot
> > > >>>>>
> > > >>>>> of
> > > >>>>>>>>
> > > >>>>>>>> code
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> duplication,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> please create an abstract class, that other
> > > >>
> > > >> control
> > > >>>>>
> > > >>>>> aware
> > > >>>>>>>>
> > > >>>>>>>> sinks
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> will
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> extend
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> from.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Vlad
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Vlad,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping
> > > >>>
> > > >>> of
> > > >>>>>
> > > >>>>> the
> > > >>>>>>>>
> > > >>>>>>>> user
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> tuple
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> control port. I was trying this out today.
> > > >>>>>>>>>>>>>>>> The problem I see us if we introduce a
> > > >>>>>
> > > >>>>> putControlTuple()
> > > >>>>>>>>>
> > > >>>>>>>>> method
> > > >>>>>>>>>>
> > > >>>>>>>>>> in
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Sink,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change.
> > > >>>
> > > >>> Also
> > > >>>>>
> > > >>>>> the
> > > >>>>>>>>>
> > > >>>>>>>>> changes
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> seemed
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already
> > > >>>>
> > > >>>> use
> > > >>>>>>
> > > >>>>>> the
> > > >>>>>>>>>
> > > >>>>>>>>> put()
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> method
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> sinks. So why do something special for custom
> > > >>>>
> > > >>>> control
> > > >>>>>>>>
> > > >>>>>>>> tuples?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> The only aspect in which the custom control
> > > >>
> > > >> tuples
> > > >>>>
> > > >>>> are
> > > >>>>>>>>>
> > > >>>>>>>>> different
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> these will be generated by the user and will
> > > >>>>
> > > >>>> actually
> > > >>>>>
> > > >>>>> be
> > > >>>>>>>>>>
> > > >>>>>>>>>> delivered
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be
> > > >>>>
> > > >>>> able
> > > >>>>>
> > > >>>>> to
> > > >>>>>>>
> > > >>>>>>> use
> > > >>>>>>>>>
> > > >>>>>>>>> the
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem
> > > >>
> > > >> to
> > > >>>>
> > > >>>> be
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> identification
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> user tuple as a control tuple.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> ~ Bhupesh
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> v.rozov@datatorrent.com
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort?
> > > >>>
> > > >>> Can't
> > > >>>>>
> > > >>>>> it
> > > >>>>>>
> > > >>>>>> be
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> delegated
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Vlad
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi Vlad,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the
> > > >>>>
> > > >>>> wrapper
> > > >>>>>
> > > >>>>> is
> > > >>>>>>>>
> > > >>>>>>>> that
> > > >>>>>>>>>>
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Ports
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the
> > > >>>
> > > >>> payload
> > > >>>>>>
> > > >>>>>> object
> > > >>>>>>>>
> > > >>>>>>>> of
> > > >>>>>>>>>>
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> control
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of
> > > >>>>
> > > >>>> the
> > > >>>>>>
> > > >>>>>> API.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> The output port will just get the payload of
> > > >>
> > > >> the
> > > >>>>>
> > > >>>>> user
> > > >>>>>>>>>
> > > >>>>>>>>> control
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> tuple.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> For
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control
> > > >>>>>
> > > >>>>> tuple,
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> payload
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> object
> > > >>>>>>>>>>>>>>>>>> will just be a Long object.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some
> > > >>>>>>>
> > > >>>>>>> recognizable
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> object
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> so
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a
> > > >>>>>
> > > >>>>> Control
> > > >>>>>>>>
> > > >>>>>>>> tuple
> > > >>>>>>>>>>
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> not a
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It
> > > >>>
> > > >>> is
> > > >>>>>>>>
> > > >>>>>>>> therefore
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> necessary
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so
> > > >>>
> > > >>> that
> > > >>>>>
> > > >>>>> can
> > > >>>>>>
> > > >>>>>> be
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> distinguished
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> from
> > > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> ControlTupleInterface.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Note
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is
> > > >>>>>>
> > > >>>>>> inserted
> > > >>>>>>>>
> > > >>>>>>>> into
> > > >>>>>>>>>>
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> sink
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is
> > > >>>>>>
> > > >>>>>> inserted
> > > >>>>>>>>
> > > >>>>>>>> into
> > > >>>>>>>>>>
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> sink,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple
> > > >>
> > > >> and
> > > >>>>>>
> > > >>>>>> cannot
> > > >>>>>>>
> > > >>>>>>> be
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> distinguished.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> For this reason, I had something like the
> > > >>>>
> > > >>>> following
> > > >>>>>
> > > >>>>> in
> > > >>>>>>>>
> > > >>>>>>>> API:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> package com.datatorrent.api;
> > > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface
> > > >>>>>>>>>>>>>>>>>> {
> > > >>>>>>>>>>>>>>>>>>      Object payload; // User control tuple
> > > >>>>
> > > >>>> payload. A
> > > >>>>>>>>
> > > >>>>>>>> Long()
> > > >>>>>>>>>>
> > > >>>>>>>>>> for
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> example.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>      UUID id;  // Unique Id to de-duplicate in
> > > >>>>>>
> > > >>>>>> downstream
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> operators
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple
> > > >>>
> > > >>> class
> > > >>>>>
> > > >>>>> as
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> wrapper
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> for
> > >
> >
>