You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Chinmay Kolhatkar <ch...@apache.org> on 2017/01/16 09:23:15 UTC

Schema Discovery Support in Apex Applications

Hi All,

Currently a DAG that is generated by user, if contains any POJOfied
operators, TUPLE_CLASS attribute needs to be set on each and every port
which receives or sends a POJO.

For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user on
both input and output ports of transform, dedup operators and also on
parser output and formatter input.

The proposal here is to reduce work that is required by user to configure
the DAG. Technically speaking if an operators knows input schema and
processing properties, it can determine output schema and convey it to
downstream operators. This way the complete pipeline can be configured
without user setting TUPLE_CLASS or even creating POJOs and adding them to
classpath.

On the same idea, I want to propose an approach where the pipeline can be
configured without user setting TUPLE_CLASS or even creating POJOs and
adding them to classpath.
Here is the document which at a high level explains the idea and a high
level design:
https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing

I would like to get opinion from community about feasibility and
applications of this proposal.
Once we get some consensus we can discuss the design in details.

Thanks,
Chinmay.

Re: Schema Discovery Support in Apex Applications

Posted by Mohit Jotwani <mo...@datatorrent.com>.
+1

Regards,
Mohit

On Tue, Jan 17, 2017 at 12:11 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> +1 for the feature.
>
> ~ Bhupesh
>
> On Mon, Jan 16, 2017 at 5:09 PM, Chinmay Kolhatkar <ch...@apache.org>
> wrote:
>
> > Those are not really anonymous POJOs... The definition of POJO will be
> > known to user as based on that only upstream operator will convey the
> tuple
> > type the operator will be emitting.
> > Using that information user can configure the operators. Those properties
> > will be a bit different though.
> >
> > On Mon, Jan 16, 2017 at 4:20 PM, AJAY GUPTA <aj...@gmail.com>
> wrote:
> >
> > > +1 for the idea.
> > >
> > > I just had one question.
> > >
> > > As I understand, there will be some form of Anonymous POJO used as
> > objects
> > > to pass information from one operator to another. Can you share how the
> > > user/operator developer would access the tuple object in case he wishes
> > to
> > > do something with it?
> > >
> > >
> > > Ajay
> > >
> > > On Mon, Jan 16, 2017 at 2:53 PM, Chinmay Kolhatkar <chinmay@apache.org
> >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Currently a DAG that is generated by user, if contains any POJOfied
> > > > operators, TUPLE_CLASS attribute needs to be set on each and every
> port
> > > > which receives or sends a POJO.
> > > >
> > > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by
> user
> > on
> > > > both input and output ports of transform, dedup operators and also on
> > > > parser output and formatter input.
> > > >
> > > > The proposal here is to reduce work that is required by user to
> > configure
> > > > the DAG. Technically speaking if an operators knows input schema and
> > > > processing properties, it can determine output schema and convey it
> to
> > > > downstream operators. This way the complete pipeline can be
> configured
> > > > without user setting TUPLE_CLASS or even creating POJOs and adding
> them
> > > to
> > > > classpath.
> > > >
> > > > On the same idea, I want to propose an approach where the pipeline
> can
> > be
> > > > configured without user setting TUPLE_CLASS or even creating POJOs
> and
> > > > adding them to classpath.
> > > > Here is the document which at a high level explains the idea and a
> high
> > > > level design:
> > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > >
> > > > I would like to get opinion from community about feasibility and
> > > > applications of this proposal.
> > > > Once we get some consensus we can discuss the design in details.
> > > >
> > > > Thanks,
> > > > Chinmay.
> > > >
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
+1 for the feature.

~ Bhupesh

On Mon, Jan 16, 2017 at 5:09 PM, Chinmay Kolhatkar <ch...@apache.org>
wrote:

> Those are not really anonymous POJOs... The definition of POJO will be
> known to user as based on that only upstream operator will convey the tuple
> type the operator will be emitting.
> Using that information user can configure the operators. Those properties
> will be a bit different though.
>
> On Mon, Jan 16, 2017 at 4:20 PM, AJAY GUPTA <aj...@gmail.com> wrote:
>
> > +1 for the idea.
> >
> > I just had one question.
> >
> > As I understand, there will be some form of Anonymous POJO used as
> objects
> > to pass information from one operator to another. Can you share how the
> > user/operator developer would access the tuple object in case he wishes
> to
> > do something with it?
> >
> >
> > Ajay
> >
> > On Mon, Jan 16, 2017 at 2:53 PM, Chinmay Kolhatkar <ch...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > Currently a DAG that is generated by user, if contains any POJOfied
> > > operators, TUPLE_CLASS attribute needs to be set on each and every port
> > > which receives or sends a POJO.
> > >
> > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user
> on
> > > both input and output ports of transform, dedup operators and also on
> > > parser output and formatter input.
> > >
> > > The proposal here is to reduce work that is required by user to
> configure
> > > the DAG. Technically speaking if an operators knows input schema and
> > > processing properties, it can determine output schema and convey it to
> > > downstream operators. This way the complete pipeline can be configured
> > > without user setting TUPLE_CLASS or even creating POJOs and adding them
> > to
> > > classpath.
> > >
> > > On the same idea, I want to propose an approach where the pipeline can
> be
> > > configured without user setting TUPLE_CLASS or even creating POJOs and
> > > adding them to classpath.
> > > Here is the document which at a high level explains the idea and a high
> > > level design:
> > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > >
> > > I would like to get opinion from community about feasibility and
> > > applications of this proposal.
> > > Once we get some consensus we can discuss the design in details.
> > >
> > > Thanks,
> > > Chinmay.
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Chinmay Kolhatkar <ch...@apache.org>.
Those are not really anonymous POJOs... The definition of POJO will be
known to user as based on that only upstream operator will convey the tuple
type the operator will be emitting.
Using that information user can configure the operators. Those properties
will be a bit different though.

On Mon, Jan 16, 2017 at 4:20 PM, AJAY GUPTA <aj...@gmail.com> wrote:

> +1 for the idea.
>
> I just had one question.
>
> As I understand, there will be some form of Anonymous POJO used as objects
> to pass information from one operator to another. Can you share how the
> user/operator developer would access the tuple object in case he wishes to
> do something with it?
>
>
> Ajay
>
> On Mon, Jan 16, 2017 at 2:53 PM, Chinmay Kolhatkar <ch...@apache.org>
> wrote:
>
> > Hi All,
> >
> > Currently a DAG that is generated by user, if contains any POJOfied
> > operators, TUPLE_CLASS attribute needs to be set on each and every port
> > which receives or sends a POJO.
> >
> > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user on
> > both input and output ports of transform, dedup operators and also on
> > parser output and formatter input.
> >
> > The proposal here is to reduce work that is required by user to configure
> > the DAG. Technically speaking if an operators knows input schema and
> > processing properties, it can determine output schema and convey it to
> > downstream operators. This way the complete pipeline can be configured
> > without user setting TUPLE_CLASS or even creating POJOs and adding them
> to
> > classpath.
> >
> > On the same idea, I want to propose an approach where the pipeline can be
> > configured without user setting TUPLE_CLASS or even creating POJOs and
> > adding them to classpath.
> > Here is the document which at a high level explains the idea and a high
> > level design:
> > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> >
> > I would like to get opinion from community about feasibility and
> > applications of this proposal.
> > Once we get some consensus we can discuss the design in details.
> >
> > Thanks,
> > Chinmay.
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by AJAY GUPTA <aj...@gmail.com>.
+1 for the idea.

I just had one question.

As I understand, there will be some form of Anonymous POJO used as objects
to pass information from one operator to another. Can you share how the
user/operator developer would access the tuple object in case he wishes to
do something with it?


Ajay

On Mon, Jan 16, 2017 at 2:53 PM, Chinmay Kolhatkar <ch...@apache.org>
wrote:

> Hi All,
>
> Currently a DAG that is generated by user, if contains any POJOfied
> operators, TUPLE_CLASS attribute needs to be set on each and every port
> which receives or sends a POJO.
>
> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user on
> both input and output ports of transform, dedup operators and also on
> parser output and formatter input.
>
> The proposal here is to reduce work that is required by user to configure
> the DAG. Technically speaking if an operators knows input schema and
> processing properties, it can determine output schema and convey it to
> downstream operators. This way the complete pipeline can be configured
> without user setting TUPLE_CLASS or even creating POJOs and adding them to
> classpath.
>
> On the same idea, I want to propose an approach where the pipeline can be
> configured without user setting TUPLE_CLASS or even creating POJOs and
> adding them to classpath.
> Here is the document which at a high level explains the idea and a high
> level design:
> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>
> I would like to get opinion from community about feasibility and
> applications of this proposal.
> Once we get some consensus we can discuss the design in details.
>
> Thanks,
> Chinmay.
>

Re: Schema Discovery Support in Apex Applications

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Will do that.

On Fri, Feb 3, 2017 at 3:15 PM, Vlad Rozov <v....@datatorrent.com> wrote:

> IMO, it will be good to summarize schema use case and proposed approach to
> support it on the control tuple e-mail thread. Not everyone interested in
> the custom control tuple may be following schema support thread.
>
> Thank you,
>
> Vlad
>
>
> On 2/3/17 08:47, Pramod Immaneni wrote:
>
>> On Fri, Feb 3, 2017 at 7:59 AM, Thomas Weise <th...@apache.org> wrote:
>>
>> Agreed. As noted the main concern was the ability to support idempotency.
>>> It isn't really "re-ordering" because when you have multiple input ports,
>>> there isn't any ordering guarantee within a streaming window.
>>>
>>> The reordering I was referring to is the reordering that would happen
>> either within the container that receives the tuples or the one that sends
>> the tuples, by holding on to the control tuple(s) till the window boundary
>> and not the order in which data is received across the different paths.
>> Also, the idempotency concern is that the operator developer might make a
>> mistake by making the incorrect ordering assumption, that you mentioned,
>> and do the wrong thing isn't it? It is not that this approach will break
>> idempotency or make it not possible to achieve it.
>>
>> It looks like we have an agreement on this approach so far. Do folks see
>> the need to summarize this in the control tuple discussion thread as well.
>> I can do that.
>>
>> Thanks
>>
>>
>> The end window boundary is good when the control tuple needs to be
>>> processed after all associated data tuples (which is the case for
>>> watermarks).
>>>
>>> For schema it is the opposite, the schema needs to be seen before all
>>> data
>>> tuples. The scenario of multiple input ports needs to be considered here
>>> as
>>> well.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Feb 2, 2017 at 9:59 AM, Vlad Rozov <v....@datatorrent.com>
>>> wrote:
>>>
>>> I second the proposal to revisit custom control tuple delivery and
>>>> re-ordering. Schema support brings a use case that was missing when we
>>>> discussed custom control tuples.
>>>>
>>>> Thank you,
>>>>
>>>> Vlad
>>>>
>>>>
>>>> On 2/1/17 21:56, Pramod Immaneni wrote:
>>>>
>>>> This can be done neatly and possibly completely outside the engine if we
>>>>> are able to deliver schema information via the control tuple mechanism.
>>>>> Current control tuple proposal reorders the control tuple to be
>>>>>
>>>> delivered
>>>
>>>> at the end of the window to the operator. This would not be feasible for
>>>>> schemas as the schema would need to be delivered before the data. If we
>>>>> can
>>>>> reconsider this behavior and consider not reordering the control tuple
>>>>>
>>>> it
>>>
>>>> would work in this use case. We can have further discussions on the
>>>>> scenarios this raises like what to do when there are multiple paths for
>>>>> data, how control tuples get delivered to unifiers and look into
>>>>> suggestions like synchronizing on control tuple boundaries and other
>>>>>
>>>> ways
>>>
>>>> to solve these. What do you guys think?
>>>>>
>>>>> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>> I think dynamic schema would be good to consider (schema known and
>>>>>
>>>>>> possibly
>>>>>> changing at runtime). Some applications cannot be written under the
>>>>>> assumption that the schema is known upfront.
>>>>>>
>>>>>> Also, does this really need to leak into the engine? I think it would
>>>>>>
>>>>> be
>>>
>>>> good to consider alternatives and tradeoffs.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
>>>>>> chinmay@datatorrent.com
>>>>>>
>>>>>> wrote:
>>>>>>> Consumer of output port operator schema is going next downstream
>>>>>>>
>>>>>>> operator.
>>>>>>
>>>>>> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <
>>>>>>>
>>>>>> sergey@datatorrent.com
>>>
>>>> wrote:
>>>>>>>
>>>>>>> Sorry, I’m a new person in the APEX team. And I don't understand
>>>>>>> clearly
>>>>>>> who are consumers of the output port operator schema(s).
>>>>>>>
>>>>>>>> 1. If the consumers are non-run-time callers like the application
>>>>>>>>
>>>>>>>> manager
>>>>>>> or UI designer, maybe it makes sense to use Java static method(s) to
>>>>>>>
>>>>>>>> retrieve the output port operator schema(s). I guess the performance
>>>>>>>>
>>>>>>>> of a
>>>>>>> single call of a static method via reflection can be ignored.
>>>>>>>
>>>>>>>> 2. If the consumer is next downstream operator, maybe it makes sense
>>>>>>>>
>>>>>>> to
>>>
>>>> send an output port operator schema from upstream operator to next
>>>>>>>> downstream operator via the stream. The corresponded methods that
>>>>>>>>
>>>>>>> would
>>>
>>>> send and receive the schema should be declared in the
>>>>>>>> interface/abstract-class of the upstream and downstream operators.
>>>>>>>>
>>>>>>> The
>>>
>>>> sending/receiving of an output schema should be processed right
>>>>>>>>
>>>>>>> before
>>>
>>>> the
>>>>>>>
>>>>>>> sending of the first data record via the stream.
>>>>>>>>
>>>>>>>> One of examples of a typical implementation for sending of metadata
>>>>>>>>
>>>>>>>> with
>>>>>>> a
>>>>>>>
>>>>>>> regular result set is the sending of JDBC metadata as a part of JDBC
>>>>>>>>
>>>>>>>> result
>>>>>>>
>>>>>>> set. And I hope the output schema (metadata of the streamed data) in
>>>>>>>>
>>>>>>>> the
>>>>>>> implementation should contain not only a signature of the streamed
>>>>>>> objects
>>>>>>>
>>>>>>> (like field names and data types), but also any other properties of
>>>>>>>>
>>>>>>> the
>>>
>>>> data that can be useful by the schema receiver to process the data
>>>>>>>>
>>>>>>> (for
>>>
>>>> instance, a delimiter for CSV record stream).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sergey
>>>>>>>>
>>>>>>>> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
>>>>>>>>
>>>>>>>> chinmay@datatorrent.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank you all for the feedback.
>>>>>>>>>
>>>>>>>>> I've created a Jira for this: APEXCORE-623 and I'll attach the same
>>>>>>>>> document and link to this mailchain there.
>>>>>>>>>
>>>>>>>>> As a first part of this Jira, there are 2 steps I would like to
>>>>>>>>>
>>>>>>>>> propose:
>>>>>>>> 1. Add following interface at com.datatorrent.common.util.
>>>>>>>> SchemaAware.
>>>>>>>>
>>>>>>> interface SchemaAware {
>>>>>>>
>>>>>>>> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
>>>>>>>>>
>>>>>>>>> inputSchema);
>>>>>>>>
>>>>>>>> }
>>>>>>>>>
>>>>>>>>> This interface can be implemented by Operators to communicate its
>>>>>>>>>
>>>>>>>>> output
>>>>>>>> schema(s) to engine.
>>>>>>>>
>>>>>>>>> Input to this schema will be schema at its input port.
>>>>>>>>>
>>>>>>>>> 2. After LogicalPlan is created call SchemaAware method from
>>>>>>>>>
>>>>>>>> upstream
>>>
>>>> to
>>>>>>>> downstream operator in the DAG to propagate the Schema.
>>>>>>>>
>>>>>>>>> Once this is done, changes can be done in Malhar for the operators
>>>>>>>>>
>>>>>>>> in
>>>
>>>> question.
>>>>>>>>>
>>>>>>>>> Please share your opinion on this approach.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Chinmay.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <
>>>>>>>>> priyag@apache.org
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> +1 to have this feature.
>>>>>>>>>
>>>>>>>>>> -Priyanka
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
>>>>>>>>>>
>>>>>>>>>> pramod@datatorrent.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1
>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
>>>>>>>>>>>
>>>>>>>>>>> chinmay@apache.org>
>>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>>> Currently a DAG that is generated by user, if contains any
>>>>>>>>>>>>
>>>>>>>>>>>> POJOfied
>>>>>>>>>>>
>>>>>>>>>> operators, TUPLE_CLASS attribute needs to be set on each and
>>>>>>>>
>>>>>>>>> every
>>>>>>>>>>>
>>>>>>>>>> port
>>>>>>>>
>>>>>>>> which receives or sends a POJO.
>>>>>>>>>
>>>>>>>>>> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
>>>>>>>>>>>>
>>>>>>>>>>>> ->
>>>>>>>>>>>
>>>>>>>>>> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
>>>>>>>>
>>>>>>>>> by
>>>>>>>>>>>
>>>>>>>>>> user
>>>>>>>
>>>>>>>> on
>>>>>>>>>
>>>>>>>>>> both input and output ports of transform, dedup operators and
>>>>>>>>>>> also
>>>>>>>>>>>
>>>>>>>>>> on
>>>>>>>>
>>>>>>>> parser output and formatter input.
>>>>>>>>>
>>>>>>>>>> The proposal here is to reduce work that is required by user to
>>>>>>>>>>>>
>>>>>>>>>>>> configure
>>>>>>>>>>> the DAG. Technically speaking if an operators knows input
>>>>>>>>>>> schema
>>>>>>>>>>>
>>>>>>>>>> and
>>>>>>>
>>>>>>>> processing properties, it can determine output schema and
>>>>>>>>>
>>>>>>>>>> convey
>>>>>>>>>>>
>>>>>>>>>> it to
>>>>>>>
>>>>>>>> downstream operators. This way the complete pipeline can be
>>>>>>>>>
>>>>>>>>>> configured
>>>>>>>>>>>
>>>>>>>>>> without user setting TUPLE_CLASS or even creating POJOs and
>>>>>>>>>
>>>>>>>>>> adding
>>>>>>>>>>>
>>>>>>>>>> them
>>>>>>>>
>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> classpath.
>>>>>>>>>>>>
>>>>>>>>>>>> On the same idea, I want to propose an approach where the
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline
>>>>>>>>>>>
>>>>>>>>>> can
>>>>>>>>
>>>>>>>> be
>>>>>>>>>
>>>>>>>>>> configured without user setting TUPLE_CLASS or even creating
>>>>>>>>>>> POJOs
>>>>>>>>>>>
>>>>>>>>>> and
>>>>>>>>
>>>>>>>> adding them to classpath.
>>>>>>>>>
>>>>>>>>>> Here is the document which at a high level explains the idea
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>> a
>>>>>>>
>>>>>>> high
>>>>>>>>
>>>>>>>> level design:
>>>>>>>>>
>>>>>>>>>> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
>>>>>>>>>>>> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>>>>>>>>>>>>
>>>>>>>>>>>> I would like to get opinion from community about feasibility
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>> applications of this proposal.
>>>>>>>
>>>>>>>> Once we get some consensus we can discuss the design in
>>>>>>>>>>>>
>>>>>>>>>>>> details.
>>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>
>>>>>>>> Chinmay.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>

Re: Schema Discovery Support in Apex Applications

Posted by Vlad Rozov <v....@datatorrent.com>.
IMO, it will be good to summarize schema use case and proposed approach 
to support it on the control tuple e-mail thread. Not everyone 
interested in the custom control tuple may be following schema support 
thread.

Thank you,

Vlad

On 2/3/17 08:47, Pramod Immaneni wrote:
> On Fri, Feb 3, 2017 at 7:59 AM, Thomas Weise <th...@apache.org> wrote:
>
>> Agreed. As noted the main concern was the ability to support idempotency.
>> It isn't really "re-ordering" because when you have multiple input ports,
>> there isn't any ordering guarantee within a streaming window.
>>
> The reordering I was referring to is the reordering that would happen
> either within the container that receives the tuples or the one that sends
> the tuples, by holding on to the control tuple(s) till the window boundary
> and not the order in which data is received across the different paths.
> Also, the idempotency concern is that the operator developer might make a
> mistake by making the incorrect ordering assumption, that you mentioned,
> and do the wrong thing isn't it? It is not that this approach will break
> idempotency or make it not possible to achieve it.
>
> It looks like we have an agreement on this approach so far. Do folks see
> the need to summarize this in the control tuple discussion thread as well.
> I can do that.
>
> Thanks
>
>
>> The end window boundary is good when the control tuple needs to be
>> processed after all associated data tuples (which is the case for
>> watermarks).
>>
>> For schema it is the opposite, the schema needs to be seen before all data
>> tuples. The scenario of multiple input ports needs to be considered here as
>> well.
>>
>> Thomas
>>
>>
>> On Thu, Feb 2, 2017 at 9:59 AM, Vlad Rozov <v....@datatorrent.com>
>> wrote:
>>
>>> I second the proposal to revisit custom control tuple delivery and
>>> re-ordering. Schema support brings a use case that was missing when we
>>> discussed custom control tuples.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 2/1/17 21:56, Pramod Immaneni wrote:
>>>
>>>> This can be done neatly and possibly completely outside the engine if we
>>>> are able to deliver schema information via the control tuple mechanism.
>>>> Current control tuple proposal reorders the control tuple to be
>> delivered
>>>> at the end of the window to the operator. This would not be feasible for
>>>> schemas as the schema would need to be delivered before the data. If we
>>>> can
>>>> reconsider this behavior and consider not reordering the control tuple
>> it
>>>> would work in this use case. We can have further discussions on the
>>>> scenarios this raises like what to do when there are multiple paths for
>>>> data, how control tuples get delivered to unifiers and look into
>>>> suggestions like synchronizing on control tuple boundaries and other
>> ways
>>>> to solve these. What do you guys think?
>>>>
>>>> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>> I think dynamic schema would be good to consider (schema known and
>>>>> possibly
>>>>> changing at runtime). Some applications cannot be written under the
>>>>> assumption that the schema is known upfront.
>>>>>
>>>>> Also, does this really need to leak into the engine? I think it would
>> be
>>>>> good to consider alternatives and tradeoffs.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
>>>>> chinmay@datatorrent.com
>>>>>
>>>>>> wrote:
>>>>>> Consumer of output port operator schema is going next downstream
>>>>>>
>>>>> operator.
>>>>>
>>>>>> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <
>> sergey@datatorrent.com
>>>>>> wrote:
>>>>>>
>>>>>> Sorry, I\u2019m a new person in the APEX team. And I don't understand
>>>>>> clearly
>>>>>> who are consumers of the output port operator schema(s).
>>>>>>> 1. If the consumers are non-run-time callers like the application
>>>>>>>
>>>>>> manager
>>>>>> or UI designer, maybe it makes sense to use Java static method(s) to
>>>>>>> retrieve the output port operator schema(s). I guess the performance
>>>>>>>
>>>>>> of a
>>>>>> single call of a static method via reflection can be ignored.
>>>>>>> 2. If the consumer is next downstream operator, maybe it makes sense
>> to
>>>>>>> send an output port operator schema from upstream operator to next
>>>>>>> downstream operator via the stream. The corresponded methods that
>> would
>>>>>>> send and receive the schema should be declared in the
>>>>>>> interface/abstract-class of the upstream and downstream operators.
>> The
>>>>>>> sending/receiving of an output schema should be processed right
>> before
>>>>>> the
>>>>>>
>>>>>>> sending of the first data record via the stream.
>>>>>>>
>>>>>>> One of examples of a typical implementation for sending of metadata
>>>>>>>
>>>>>> with
>>>>>> a
>>>>>>
>>>>>>> regular result set is the sending of JDBC metadata as a part of JDBC
>>>>>>>
>>>>>> result
>>>>>>
>>>>>>> set. And I hope the output schema (metadata of the streamed data) in
>>>>>>>
>>>>>> the
>>>>>> implementation should contain not only a signature of the streamed
>>>>>> objects
>>>>>>
>>>>>>> (like field names and data types), but also any other properties of
>> the
>>>>>>> data that can be useful by the schema receiver to process the data
>> (for
>>>>>>> instance, a delimiter for CSV record stream).
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Sergey
>>>>>>>
>>>>>>> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
>>>>>>>
>>>>>> chinmay@datatorrent.com>
>>>>>> wrote:
>>>>>>>> Thank you all for the feedback.
>>>>>>>>
>>>>>>>> I've created a Jira for this: APEXCORE-623 and I'll attach the same
>>>>>>>> document and link to this mailchain there.
>>>>>>>>
>>>>>>>> As a first part of this Jira, there are 2 steps I would like to
>>>>>>>>
>>>>>>> propose:
>>>>>>> 1. Add following interface at com.datatorrent.common.util.
>>>>>>> SchemaAware.
>>>>>> interface SchemaAware {
>>>>>>>> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
>>>>>>>>
>>>>>>> inputSchema);
>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> This interface can be implemented by Operators to communicate its
>>>>>>>>
>>>>>>> output
>>>>>>> schema(s) to engine.
>>>>>>>> Input to this schema will be schema at its input port.
>>>>>>>>
>>>>>>>> 2. After LogicalPlan is created call SchemaAware method from
>> upstream
>>>>>>> to
>>>>>>> downstream operator in the DAG to propagate the Schema.
>>>>>>>> Once this is done, changes can be done in Malhar for the operators
>> in
>>>>>>>> question.
>>>>>>>>
>>>>>>>> Please share your opinion on this approach.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Chinmay.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <priyag@apache.org
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +1 to have this feature.
>>>>>>>>> -Priyanka
>>>>>>>>>
>>>>>>>>> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
>>>>>>>>>
>>>>>>>> pramod@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>> +1
>>>>>>>>>> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
>>>>>>>>>>
>>>>>>>>> chinmay@apache.org>
>>>>>>>> wrote:
>>>>>>>>>> Hi All,
>>>>>>>>>>> Currently a DAG that is generated by user, if contains any
>>>>>>>>>>>
>>>>>>>>>> POJOfied
>>>>>>> operators, TUPLE_CLASS attribute needs to be set on each and
>>>>>>>>>> every
>>>>>>> port
>>>>>>>
>>>>>>>> which receives or sends a POJO.
>>>>>>>>>>> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
>>>>>>>>>>>
>>>>>>>>>> ->
>>>>>>> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
>>>>>>>>>> by
>>>>>> user
>>>>>>>> on
>>>>>>>>>> both input and output ports of transform, dedup operators and
>>>>>>>>>> also
>>>>>>> on
>>>>>>>
>>>>>>>> parser output and formatter input.
>>>>>>>>>>> The proposal here is to reduce work that is required by user to
>>>>>>>>>>>
>>>>>>>>>> configure
>>>>>>>>>> the DAG. Technically speaking if an operators knows input
>>>>>>>>>> schema
>>>>>> and
>>>>>>>> processing properties, it can determine output schema and
>>>>>>>>>> convey
>>>>>> it to
>>>>>>>> downstream operators. This way the complete pipeline can be
>>>>>>>>>> configured
>>>>>>>> without user setting TUPLE_CLASS or even creating POJOs and
>>>>>>>>>> adding
>>>>>>> them
>>>>>>>
>>>>>>>> to
>>>>>>>>>>> classpath.
>>>>>>>>>>>
>>>>>>>>>>> On the same idea, I want to propose an approach where the
>>>>>>>>>>>
>>>>>>>>>> pipeline
>>>>>>> can
>>>>>>>
>>>>>>>> be
>>>>>>>>>> configured without user setting TUPLE_CLASS or even creating
>>>>>>>>>> POJOs
>>>>>>> and
>>>>>>>
>>>>>>>> adding them to classpath.
>>>>>>>>>>> Here is the document which at a high level explains the idea
>>>>>>>>>>>
>>>>>>>>>> and
>>>>>> a
>>>>>>
>>>>>>> high
>>>>>>>
>>>>>>>> level design:
>>>>>>>>>>> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
>>>>>>>>>>> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>> I would like to get opinion from community about feasibility
>>>>>>>>>>>
>>>>>>>>>> and
>>>>>> applications of this proposal.
>>>>>>>>>>> Once we get some consensus we can discuss the design in
>>>>>>>>>>>
>>>>>>>>>> details.
>>>>>> Thanks,
>>>>>>>>>>> Chinmay.
>>>>>>>>>>>
>>>>>>>>>>>


Re: Schema Discovery Support in Apex Applications

Posted by Pramod Immaneni <pr...@datatorrent.com>.
On Fri, Feb 3, 2017 at 7:59 AM, Thomas Weise <th...@apache.org> wrote:

> Agreed. As noted the main concern was the ability to support idempotency.
> It isn't really "re-ordering" because when you have multiple input ports,
> there isn't any ordering guarantee within a streaming window.
>

The reordering I was referring to is the reordering that would happen
either within the container that receives the tuples or the one that sends
the tuples, by holding on to the control tuple(s) till the window boundary
and not the order in which data is received across the different paths.
Also, the idempotency concern is that the operator developer might make a
mistake by making the incorrect ordering assumption, that you mentioned,
and do the wrong thing isn't it? It is not that this approach will break
idempotency or make it not possible to achieve it.

It looks like we have an agreement on this approach so far. Do folks see
the need to summarize this in the control tuple discussion thread as well.
I can do that.

Thanks


> The end window boundary is good when the control tuple needs to be
> processed after all associated data tuples (which is the case for
> watermarks).
>
> For schema it is the opposite, the schema needs to be seen before all data
> tuples. The scenario of multiple input ports needs to be considered here as
> well.
>
> Thomas
>
>
> On Thu, Feb 2, 2017 at 9:59 AM, Vlad Rozov <v....@datatorrent.com>
> wrote:
>
> > I second the proposal to revisit custom control tuple delivery and
> > re-ordering. Schema support brings a use case that was missing when we
> > discussed custom control tuples.
> >
> > Thank you,
> >
> > Vlad
> >
> >
> > On 2/1/17 21:56, Pramod Immaneni wrote:
> >
> >> This can be done neatly and possibly completely outside the engine if we
> >> are able to deliver schema information via the control tuple mechanism.
> >> Current control tuple proposal reorders the control tuple to be
> delivered
> >> at the end of the window to the operator. This would not be feasible for
> >> schemas as the schema would need to be delivered before the data. If we
> >> can
> >> reconsider this behavior and consider not reordering the control tuple
> it
> >> would work in this use case. We can have further discussions on the
> >> scenarios this raises like what to do when there are multiple paths for
> >> data, how control tuples get delivered to unifiers and look into
> >> suggestions like synchronizing on control tuple boundaries and other
> ways
> >> to solve these. What do you guys think?
> >>
> >> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
> >>
> >> I think dynamic schema would be good to consider (schema known and
> >>> possibly
> >>> changing at runtime). Some applications cannot be written under the
> >>> assumption that the schema is known upfront.
> >>>
> >>> Also, does this really need to leak into the engine? I think it would
> be
> >>> good to consider alternatives and tradeoffs.
> >>>
> >>> Thomas
> >>>
> >>>
> >>> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
> >>> chinmay@datatorrent.com
> >>>
> >>>> wrote:
> >>>> Consumer of output port operator schema is going next downstream
> >>>>
> >>> operator.
> >>>
> >>>>
> >>>> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <
> sergey@datatorrent.com
> >>>> >
> >>>> wrote:
> >>>>
> >>>> Sorry, I’m a new person in the APEX team. And I don't understand
> >>>>>
> >>>> clearly
> >>>
> >>>> who are consumers of the output port operator schema(s).
> >>>>>
> >>>>> 1. If the consumers are non-run-time callers like the application
> >>>>>
> >>>> manager
> >>>
> >>>> or UI designer, maybe it makes sense to use Java static method(s) to
> >>>>> retrieve the output port operator schema(s). I guess the performance
> >>>>>
> >>>> of a
> >>>
> >>>> single call of a static method via reflection can be ignored.
> >>>>>
> >>>>> 2. If the consumer is next downstream operator, maybe it makes sense
> to
> >>>>> send an output port operator schema from upstream operator to next
> >>>>> downstream operator via the stream. The corresponded methods that
> would
> >>>>> send and receive the schema should be declared in the
> >>>>> interface/abstract-class of the upstream and downstream operators.
> The
> >>>>> sending/receiving of an output schema should be processed right
> before
> >>>>>
> >>>> the
> >>>>
> >>>>> sending of the first data record via the stream.
> >>>>>
> >>>>> One of examples of a typical implementation for sending of metadata
> >>>>>
> >>>> with
> >>>
> >>>> a
> >>>>
> >>>>> regular result set is the sending of JDBC metadata as a part of JDBC
> >>>>>
> >>>> result
> >>>>
> >>>>> set. And I hope the output schema (metadata of the streamed data) in
> >>>>>
> >>>> the
> >>>
> >>>> implementation should contain not only a signature of the streamed
> >>>>>
> >>>> objects
> >>>>
> >>>>> (like field names and data types), but also any other properties of
> the
> >>>>> data that can be useful by the schema receiver to process the data
> (for
> >>>>> instance, a delimiter for CSV record stream).
> >>>>>
> >>>>> Thanks,
> >>>>> Sergey
> >>>>>
> >>>>> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
> >>>>>
> >>>> chinmay@datatorrent.com>
> >>>
> >>>> wrote:
> >>>>>
> >>>>>> Thank you all for the feedback.
> >>>>>>
> >>>>>> I've created a Jira for this: APEXCORE-623 and I'll attach the same
> >>>>>> document and link to this mailchain there.
> >>>>>>
> >>>>>> As a first part of this Jira, there are 2 steps I would like to
> >>>>>>
> >>>>> propose:
> >>>>
> >>>>> 1. Add following interface at com.datatorrent.common.util.
> >>>>>>
> >>>>> SchemaAware.
> >>>
> >>>> interface SchemaAware {
> >>>>>>
> >>>>>> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
> >>>>>>
> >>>>> inputSchema);
> >>>>>
> >>>>>> }
> >>>>>>
> >>>>>> This interface can be implemented by Operators to communicate its
> >>>>>>
> >>>>> output
> >>>>
> >>>>> schema(s) to engine.
> >>>>>> Input to this schema will be schema at its input port.
> >>>>>>
> >>>>>> 2. After LogicalPlan is created call SchemaAware method from
> upstream
> >>>>>>
> >>>>> to
> >>>>
> >>>>> downstream operator in the DAG to propagate the Schema.
> >>>>>>
> >>>>>> Once this is done, changes can be done in Malhar for the operators
> in
> >>>>>> question.
> >>>>>>
> >>>>>> Please share your opinion on this approach.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Chinmay.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <priyag@apache.org
> >
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> +1 to have this feature.
> >>>>>>>
> >>>>>>> -Priyanka
> >>>>>>>
> >>>>>>> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
> >>>>>>>
> >>>>>> pramod@datatorrent.com>
> >>>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> +1
> >>>>>>>>
> >>>>>>>> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
> >>>>>>>>
> >>>>>>> chinmay@apache.org>
> >>>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> Currently a DAG that is generated by user, if contains any
> >>>>>>>>>
> >>>>>>>> POJOfied
> >>>>
> >>>>> operators, TUPLE_CLASS attribute needs to be set on each and
> >>>>>>>>>
> >>>>>>>> every
> >>>>
> >>>>> port
> >>>>>
> >>>>>> which receives or sends a POJO.
> >>>>>>>>>
> >>>>>>>>> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
> >>>>>>>>>
> >>>>>>>> ->
> >>>>
> >>>>> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
> >>>>>>>>>
> >>>>>>>> by
> >>>
> >>>> user
> >>>>>
> >>>>>> on
> >>>>>>>
> >>>>>>>> both input and output ports of transform, dedup operators and
> >>>>>>>>>
> >>>>>>>> also
> >>>>
> >>>>> on
> >>>>>
> >>>>>> parser output and formatter input.
> >>>>>>>>>
> >>>>>>>>> The proposal here is to reduce work that is required by user to
> >>>>>>>>>
> >>>>>>>> configure
> >>>>>>>
> >>>>>>>> the DAG. Technically speaking if an operators knows input
> >>>>>>>>>
> >>>>>>>> schema
> >>>
> >>>> and
> >>>>>
> >>>>>> processing properties, it can determine output schema and
> >>>>>>>>>
> >>>>>>>> convey
> >>>
> >>>> it to
> >>>>>
> >>>>>> downstream operators. This way the complete pipeline can be
> >>>>>>>>>
> >>>>>>>> configured
> >>>>>
> >>>>>> without user setting TUPLE_CLASS or even creating POJOs and
> >>>>>>>>>
> >>>>>>>> adding
> >>>>
> >>>>> them
> >>>>>
> >>>>>> to
> >>>>>>>>
> >>>>>>>>> classpath.
> >>>>>>>>>
> >>>>>>>>> On the same idea, I want to propose an approach where the
> >>>>>>>>>
> >>>>>>>> pipeline
> >>>>
> >>>>> can
> >>>>>
> >>>>>> be
> >>>>>>>
> >>>>>>>> configured without user setting TUPLE_CLASS or even creating
> >>>>>>>>>
> >>>>>>>> POJOs
> >>>>
> >>>>> and
> >>>>>
> >>>>>> adding them to classpath.
> >>>>>>>>> Here is the document which at a high level explains the idea
> >>>>>>>>>
> >>>>>>>> and
> >>>
> >>>> a
> >>>>
> >>>>> high
> >>>>>
> >>>>>> level design:
> >>>>>>>>> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> >>>>>>>>> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> >>>>>>>>>
> >>>>>>>>> I would like to get opinion from community about feasibility
> >>>>>>>>>
> >>>>>>>> and
> >>>
> >>>> applications of this proposal.
> >>>>>>>>> Once we get some consensus we can discuss the design in
> >>>>>>>>>
> >>>>>>>> details.
> >>>
> >>>> Thanks,
> >>>>>>>>> Chinmay.
> >>>>>>>>>
> >>>>>>>>>
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Thomas Weise <th...@apache.org>.
Agreed. As noted the main concern was the ability to support idempotency.
It isn't really "re-ordering" because when you have multiple input ports,
there isn't any ordering guarantee within a streaming window.

The end window boundary is good when the control tuple needs to be
processed after all associated data tuples (which is the case for
watermarks).

For schema it is the opposite, the schema needs to be seen before all data
tuples. The scenario of multiple input ports needs to be considered here as
well.

Thomas


On Thu, Feb 2, 2017 at 9:59 AM, Vlad Rozov <v....@datatorrent.com> wrote:

> I second the proposal to revisit custom control tuple delivery and
> re-ordering. Schema support brings a use case that was missing when we
> discussed custom control tuples.
>
> Thank you,
>
> Vlad
>
>
> On 2/1/17 21:56, Pramod Immaneni wrote:
>
>> This can be done neatly and possibly completely outside the engine if we
>> are able to deliver schema information via the control tuple mechanism.
>> Current control tuple proposal reorders the control tuple to be delivered
>> at the end of the window to the operator. This would not be feasible for
>> schemas as the schema would need to be delivered before the data. If we
>> can
>> reconsider this behavior and consider not reordering the control tuple it
>> would work in this use case. We can have further discussions on the
>> scenarios this raises like what to do when there are multiple paths for
>> data, how control tuples get delivered to unifiers and look into
>> suggestions like synchronizing on control tuple boundaries and other ways
>> to solve these. What do you guys think?
>>
>> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
>>
>> I think dynamic schema would be good to consider (schema known and
>>> possibly
>>> changing at runtime). Some applications cannot be written under the
>>> assumption that the schema is known upfront.
>>>
>>> Also, does this really need to leak into the engine? I think it would be
>>> good to consider alternatives and tradeoffs.
>>>
>>> Thomas
>>>
>>>
>>> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
>>> chinmay@datatorrent.com
>>>
>>>> wrote:
>>>> Consumer of output port operator schema is going next downstream
>>>>
>>> operator.
>>>
>>>>
>>>> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <sergey@datatorrent.com
>>>> >
>>>> wrote:
>>>>
>>>> Sorry, I’m a new person in the APEX team. And I don't understand
>>>>>
>>>> clearly
>>>
>>>> who are consumers of the output port operator schema(s).
>>>>>
>>>>> 1. If the consumers are non-run-time callers like the application
>>>>>
>>>> manager
>>>
>>>> or UI designer, maybe it makes sense to use Java static method(s) to
>>>>> retrieve the output port operator schema(s). I guess the performance
>>>>>
>>>> of a
>>>
>>>> single call of a static method via reflection can be ignored.
>>>>>
>>>>> 2. If the consumer is next downstream operator, maybe it makes sense to
>>>>> send an output port operator schema from upstream operator to next
>>>>> downstream operator via the stream. The corresponded methods that would
>>>>> send and receive the schema should be declared in the
>>>>> interface/abstract-class of the upstream and downstream operators. The
>>>>> sending/receiving of an output schema should be processed right before
>>>>>
>>>> the
>>>>
>>>>> sending of the first data record via the stream.
>>>>>
>>>>> One of examples of a typical implementation for sending of metadata
>>>>>
>>>> with
>>>
>>>> a
>>>>
>>>>> regular result set is the sending of JDBC metadata as a part of JDBC
>>>>>
>>>> result
>>>>
>>>>> set. And I hope the output schema (metadata of the streamed data) in
>>>>>
>>>> the
>>>
>>>> implementation should contain not only a signature of the streamed
>>>>>
>>>> objects
>>>>
>>>>> (like field names and data types), but also any other properties of the
>>>>> data that can be useful by the schema receiver to process the data (for
>>>>> instance, a delimiter for CSV record stream).
>>>>>
>>>>> Thanks,
>>>>> Sergey
>>>>>
>>>>> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
>>>>>
>>>> chinmay@datatorrent.com>
>>>
>>>> wrote:
>>>>>
>>>>>> Thank you all for the feedback.
>>>>>>
>>>>>> I've created a Jira for this: APEXCORE-623 and I'll attach the same
>>>>>> document and link to this mailchain there.
>>>>>>
>>>>>> As a first part of this Jira, there are 2 steps I would like to
>>>>>>
>>>>> propose:
>>>>
>>>>> 1. Add following interface at com.datatorrent.common.util.
>>>>>>
>>>>> SchemaAware.
>>>
>>>> interface SchemaAware {
>>>>>>
>>>>>> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
>>>>>>
>>>>> inputSchema);
>>>>>
>>>>>> }
>>>>>>
>>>>>> This interface can be implemented by Operators to communicate its
>>>>>>
>>>>> output
>>>>
>>>>> schema(s) to engine.
>>>>>> Input to this schema will be schema at its input port.
>>>>>>
>>>>>> 2. After LogicalPlan is created call SchemaAware method from upstream
>>>>>>
>>>>> to
>>>>
>>>>> downstream operator in the DAG to propagate the Schema.
>>>>>>
>>>>>> Once this is done, changes can be done in Malhar for the operators in
>>>>>> question.
>>>>>>
>>>>>> Please share your opinion on this approach.
>>>>>>
>>>>>> Thanks,
>>>>>> Chinmay.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org>
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> +1 to have this feature.
>>>>>>>
>>>>>>> -Priyanka
>>>>>>>
>>>>>>> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
>>>>>>>
>>>>>> pramod@datatorrent.com>
>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> +1
>>>>>>>>
>>>>>>>> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
>>>>>>>>
>>>>>>> chinmay@apache.org>
>>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> Currently a DAG that is generated by user, if contains any
>>>>>>>>>
>>>>>>>> POJOfied
>>>>
>>>>> operators, TUPLE_CLASS attribute needs to be set on each and
>>>>>>>>>
>>>>>>>> every
>>>>
>>>>> port
>>>>>
>>>>>> which receives or sends a POJO.
>>>>>>>>>
>>>>>>>>> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
>>>>>>>>>
>>>>>>>> ->
>>>>
>>>>> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
>>>>>>>>>
>>>>>>>> by
>>>
>>>> user
>>>>>
>>>>>> on
>>>>>>>
>>>>>>>> both input and output ports of transform, dedup operators and
>>>>>>>>>
>>>>>>>> also
>>>>
>>>>> on
>>>>>
>>>>>> parser output and formatter input.
>>>>>>>>>
>>>>>>>>> The proposal here is to reduce work that is required by user to
>>>>>>>>>
>>>>>>>> configure
>>>>>>>
>>>>>>>> the DAG. Technically speaking if an operators knows input
>>>>>>>>>
>>>>>>>> schema
>>>
>>>> and
>>>>>
>>>>>> processing properties, it can determine output schema and
>>>>>>>>>
>>>>>>>> convey
>>>
>>>> it to
>>>>>
>>>>>> downstream operators. This way the complete pipeline can be
>>>>>>>>>
>>>>>>>> configured
>>>>>
>>>>>> without user setting TUPLE_CLASS or even creating POJOs and
>>>>>>>>>
>>>>>>>> adding
>>>>
>>>>> them
>>>>>
>>>>>> to
>>>>>>>>
>>>>>>>>> classpath.
>>>>>>>>>
>>>>>>>>> On the same idea, I want to propose an approach where the
>>>>>>>>>
>>>>>>>> pipeline
>>>>
>>>>> can
>>>>>
>>>>>> be
>>>>>>>
>>>>>>>> configured without user setting TUPLE_CLASS or even creating
>>>>>>>>>
>>>>>>>> POJOs
>>>>
>>>>> and
>>>>>
>>>>>> adding them to classpath.
>>>>>>>>> Here is the document which at a high level explains the idea
>>>>>>>>>
>>>>>>>> and
>>>
>>>> a
>>>>
>>>>> high
>>>>>
>>>>>> level design:
>>>>>>>>> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
>>>>>>>>> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>>>>>>>>>
>>>>>>>>> I would like to get opinion from community about feasibility
>>>>>>>>>
>>>>>>>> and
>>>
>>>> applications of this proposal.
>>>>>>>>> Once we get some consensus we can discuss the design in
>>>>>>>>>
>>>>>>>> details.
>>>
>>>> Thanks,
>>>>>>>>> Chinmay.
>>>>>>>>>
>>>>>>>>>
>

Re: Schema Discovery Support in Apex Applications

Posted by Vlad Rozov <v....@datatorrent.com>.
I second the proposal to revisit custom control tuple delivery and 
re-ordering. Schema support brings a use case that was missing when we 
discussed custom control tuples.

Thank you,

Vlad

On 2/1/17 21:56, Pramod Immaneni wrote:
> This can be done neatly and possibly completely outside the engine if we
> are able to deliver schema information via the control tuple mechanism.
> Current control tuple proposal reorders the control tuple to be delivered
> at the end of the window to the operator. This would not be feasible for
> schemas as the schema would need to be delivered before the data. If we can
> reconsider this behavior and consider not reordering the control tuple it
> would work in this use case. We can have further discussions on the
> scenarios this raises like what to do when there are multiple paths for
> data, how control tuples get delivered to unifiers and look into
> suggestions like synchronizing on control tuple boundaries and other ways
> to solve these. What do you guys think?
>
> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
>
>> I think dynamic schema would be good to consider (schema known and possibly
>> changing at runtime). Some applications cannot be written under the
>> assumption that the schema is known upfront.
>>
>> Also, does this really need to leak into the engine? I think it would be
>> good to consider alternatives and tradeoffs.
>>
>> Thomas
>>
>>
>> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
>> chinmay@datatorrent.com
>>> wrote:
>>> Consumer of output port operator schema is going next downstream
>> operator.
>>>
>>> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <se...@datatorrent.com>
>>> wrote:
>>>
>>>> Sorry, I\u2019m a new person in the APEX team. And I don't understand
>> clearly
>>>> who are consumers of the output port operator schema(s).
>>>>
>>>> 1. If the consumers are non-run-time callers like the application
>> manager
>>>> or UI designer, maybe it makes sense to use Java static method(s) to
>>>> retrieve the output port operator schema(s). I guess the performance
>> of a
>>>> single call of a static method via reflection can be ignored.
>>>>
>>>> 2. If the consumer is next downstream operator, maybe it makes sense to
>>>> send an output port operator schema from upstream operator to next
>>>> downstream operator via the stream. The corresponded methods that would
>>>> send and receive the schema should be declared in the
>>>> interface/abstract-class of the upstream and downstream operators. The
>>>> sending/receiving of an output schema should be processed right before
>>> the
>>>> sending of the first data record via the stream.
>>>>
>>>> One of examples of a typical implementation for sending of metadata
>> with
>>> a
>>>> regular result set is the sending of JDBC metadata as a part of JDBC
>>> result
>>>> set. And I hope the output schema (metadata of the streamed data) in
>> the
>>>> implementation should contain not only a signature of the streamed
>>> objects
>>>> (like field names and data types), but also any other properties of the
>>>> data that can be useful by the schema receiver to process the data (for
>>>> instance, a delimiter for CSV record stream).
>>>>
>>>> Thanks,
>>>> Sergey
>>>>
>>>> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
>> chinmay@datatorrent.com>
>>>> wrote:
>>>>> Thank you all for the feedback.
>>>>>
>>>>> I've created a Jira for this: APEXCORE-623 and I'll attach the same
>>>>> document and link to this mailchain there.
>>>>>
>>>>> As a first part of this Jira, there are 2 steps I would like to
>>> propose:
>>>>> 1. Add following interface at com.datatorrent.common.util.
>> SchemaAware.
>>>>> interface SchemaAware {
>>>>>
>>>>> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
>>>> inputSchema);
>>>>> }
>>>>>
>>>>> This interface can be implemented by Operators to communicate its
>>> output
>>>>> schema(s) to engine.
>>>>> Input to this schema will be schema at its input port.
>>>>>
>>>>> 2. After LogicalPlan is created call SchemaAware method from upstream
>>> to
>>>>> downstream operator in the DAG to propagate the Schema.
>>>>>
>>>>> Once this is done, changes can be done in Malhar for the operators in
>>>>> question.
>>>>>
>>>>> Please share your opinion on this approach.
>>>>>
>>>>> Thanks,
>>>>> Chinmay.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org>
>>>> wrote:
>>>>>> +1 to have this feature.
>>>>>>
>>>>>> -Priyanka
>>>>>>
>>>>>> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
>>>> pramod@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +1
>>>>>>>
>>>>>>> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
>>>> chinmay@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Currently a DAG that is generated by user, if contains any
>>> POJOfied
>>>>>>>> operators, TUPLE_CLASS attribute needs to be set on each and
>>> every
>>>> port
>>>>>>>> which receives or sends a POJO.
>>>>>>>>
>>>>>>>> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
>>> ->
>>>>>>>> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
>> by
>>>> user
>>>>>> on
>>>>>>>> both input and output ports of transform, dedup operators and
>>> also
>>>> on
>>>>>>>> parser output and formatter input.
>>>>>>>>
>>>>>>>> The proposal here is to reduce work that is required by user to
>>>>>> configure
>>>>>>>> the DAG. Technically speaking if an operators knows input
>> schema
>>>> and
>>>>>>>> processing properties, it can determine output schema and
>> convey
>>>> it to
>>>>>>>> downstream operators. This way the complete pipeline can be
>>>> configured
>>>>>>>> without user setting TUPLE_CLASS or even creating POJOs and
>>> adding
>>>> them
>>>>>>> to
>>>>>>>> classpath.
>>>>>>>>
>>>>>>>> On the same idea, I want to propose an approach where the
>>> pipeline
>>>> can
>>>>>> be
>>>>>>>> configured without user setting TUPLE_CLASS or even creating
>>> POJOs
>>>> and
>>>>>>>> adding them to classpath.
>>>>>>>> Here is the document which at a high level explains the idea
>> and
>>> a
>>>> high
>>>>>>>> level design:
>>>>>>>> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
>>>>>>>> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>>>>>>>>
>>>>>>>> I would like to get opinion from community about feasibility
>> and
>>>>>>>> applications of this proposal.
>>>>>>>> Once we get some consensus we can discuss the design in
>> details.
>>>>>>>> Thanks,
>>>>>>>> Chinmay.
>>>>>>>>


Re: Schema Discovery Support in Apex Applications

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
​I think the only concern for deciding the end-of-window-processing
behavior for control tuples was that developers may, inadvertently mess up
the idempotency of the operator.
Even though we are trying to enforce this, it seems that even the emit of a
control tuple (from the source) may not happen in between a window, if
strict guarantees are needed. Consider, for example, that we need to send a
control tuple at the end of a file which allows downstream operators to
reset their state and prepare for a new file. As a developer of the input
operator, I would not want to emit the end of file tuple for some file and
start processing the next file, both in the same window. We would like to
make sure that the end of file is emitted first so that downstream
operators can consume it and take appropriate actions, and then the new
file is processed starting from the next window.
Given that the input operators can take care of this, there is little
chance that the downstream operators may need to do much in order to
maintain idempotency.

My suggestion is to allow both options (in order and end of window
delivery) and control it via an attribute.

~ Bhupesh



On Thu, Feb 2, 2017 at 11:26 AM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> This can be done neatly and possibly completely outside the engine if we
> are able to deliver schema information via the control tuple mechanism.
> Current control tuple proposal reorders the control tuple to be delivered
> at the end of the window to the operator. This would not be feasible for
> schemas as the schema would need to be delivered before the data. If we can
> reconsider this behavior and consider not reordering the control tuple it
> would work in this use case. We can have further discussions on the
> scenarios this raises like what to do when there are multiple paths for
> data, how control tuples get delivered to unifiers and look into
> suggestions like synchronizing on control tuple boundaries and other ways
> to solve these. What do you guys think?
>
> On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:
>
> > I think dynamic schema would be good to consider (schema known and
> possibly
> > changing at runtime). Some applications cannot be written under the
> > assumption that the schema is known upfront.
> >
> > Also, does this really need to leak into the engine? I think it would be
> > good to consider alternatives and tradeoffs.
> >
> > Thomas
> >
> >
> > On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
> > chinmay@datatorrent.com
> > > wrote:
> >
> > > Consumer of output port operator schema is going next downstream
> > operator.
> > >
> > >
> > > On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <
> sergey@datatorrent.com>
> > > wrote:
> > >
> > > > Sorry, I’m a new person in the APEX team. And I don't understand
> > clearly
> > > > who are consumers of the output port operator schema(s).
> > > >
> > > > 1. If the consumers are non-run-time callers like the application
> > manager
> > > > or UI designer, maybe it makes sense to use Java static method(s) to
> > > > retrieve the output port operator schema(s). I guess the performance
> > of a
> > > > single call of a static method via reflection can be ignored.
> > > >
> > > > 2. If the consumer is next downstream operator, maybe it makes sense
> to
> > > > send an output port operator schema from upstream operator to next
> > > > downstream operator via the stream. The corresponded methods that
> would
> > > > send and receive the schema should be declared in the
> > > > interface/abstract-class of the upstream and downstream operators.
> The
> > > > sending/receiving of an output schema should be processed right
> before
> > > the
> > > > sending of the first data record via the stream.
> > > >
> > > > One of examples of a typical implementation for sending of metadata
> > with
> > > a
> > > > regular result set is the sending of JDBC metadata as a part of JDBC
> > > result
> > > > set. And I hope the output schema (metadata of the streamed data) in
> > the
> > > > implementation should contain not only a signature of the streamed
> > > objects
> > > > (like field names and data types), but also any other properties of
> the
> > > > data that can be useful by the schema receiver to process the data
> (for
> > > > instance, a delimiter for CSV record stream).
> > > >
> > > > Thanks,
> > > > Sergey
> > > >
> > > > On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
> > chinmay@datatorrent.com>
> > > > wrote:
> > > > > Thank you all for the feedback.
> > > > >
> > > > > I've created a Jira for this: APEXCORE-623 and I'll attach the same
> > > > > document and link to this mailchain there.
> > > > >
> > > > > As a first part of this Jira, there are 2 steps I would like to
> > > propose:
> > > > > 1. Add following interface at com.datatorrent.common.util.
> > SchemaAware.
> > > > >
> > > > > interface SchemaAware {
> > > > >
> > > > > Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
> > > > inputSchema);
> > > > > }
> > > > >
> > > > > This interface can be implemented by Operators to communicate its
> > > output
> > > > > schema(s) to engine.
> > > > > Input to this schema will be schema at its input port.
> > > > >
> > > > > 2. After LogicalPlan is created call SchemaAware method from
> upstream
> > > to
> > > > > downstream operator in the DAG to propagate the Schema.
> > > > >
> > > > > Once this is done, changes can be done in Malhar for the operators
> in
> > > > > question.
> > > > >
> > > > > Please share your opinion on this approach.
> > > > >
> > > > > Thanks,
> > > > > Chinmay.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <
> priyag@apache.org>
> > > > wrote:
> > > > >
> > > > > > +1 to have this feature.
> > > > > >
> > > > > > -Priyanka
> > > > > >
> > > > > > On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
> > > > pramod@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
> > > > chinmay@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > Currently a DAG that is generated by user, if contains any
> > > POJOfied
> > > > > > > > operators, TUPLE_CLASS attribute needs to be set on each and
> > > every
> > > > port
> > > > > > > > which receives or sends a POJO.
> > > > > > > >
> > > > > > > > For e.g., if a DAG is like File -> Parser -> Transform ->
> Dedup
> > > ->
> > > > > > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be
> set
> > by
> > > > user
> > > > > > on
> > > > > > > > both input and output ports of transform, dedup operators and
> > > also
> > > > on
> > > > > > > > parser output and formatter input.
> > > > > > > >
> > > > > > > > The proposal here is to reduce work that is required by user
> to
> > > > > > configure
> > > > > > > > the DAG. Technically speaking if an operators knows input
> > schema
> > > > and
> > > > > > > > processing properties, it can determine output schema and
> > convey
> > > > it to
> > > > > > > > downstream operators. This way the complete pipeline can be
> > > > configured
> > > > > > > > without user setting TUPLE_CLASS or even creating POJOs and
> > > adding
> > > > them
> > > > > > > to
> > > > > > > > classpath.
> > > > > > > >
> > > > > > > > On the same idea, I want to propose an approach where the
> > > pipeline
> > > > can
> > > > > > be
> > > > > > > > configured without user setting TUPLE_CLASS or even creating
> > > POJOs
> > > > and
> > > > > > > > adding them to classpath.
> > > > > > > > Here is the document which at a high level explains the idea
> > and
> > > a
> > > > high
> > > > > > > > level design:
> > > > > > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > > > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > > > > > >
> > > > > > > > I would like to get opinion from community about feasibility
> > and
> > > > > > > > applications of this proposal.
> > > > > > > > Once we get some consensus we can discuss the design in
> > details.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Chinmay.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Pramod Immaneni <pr...@datatorrent.com>.
This can be done neatly and possibly completely outside the engine if we
are able to deliver schema information via the control tuple mechanism.
Current control tuple proposal reorders the control tuple to be delivered
at the end of the window to the operator. This would not be feasible for
schemas as the schema would need to be delivered before the data. If we can
reconsider this behavior and consider not reordering the control tuple it
would work in this use case. We can have further discussions on the
scenarios this raises like what to do when there are multiple paths for
data, how control tuples get delivered to unifiers and look into
suggestions like synchronizing on control tuple boundaries and other ways
to solve these. What do you guys think?

On Wed, Feb 1, 2017 at 8:27 PM, Thomas Weise <th...@apache.org> wrote:

> I think dynamic schema would be good to consider (schema known and possibly
> changing at runtime). Some applications cannot be written under the
> assumption that the schema is known upfront.
>
> Also, does this really need to leak into the engine? I think it would be
> good to consider alternatives and tradeoffs.
>
> Thomas
>
>
> On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <
> chinmay@datatorrent.com
> > wrote:
>
> > Consumer of output port operator schema is going next downstream
> operator.
> >
> >
> > On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <se...@datatorrent.com>
> > wrote:
> >
> > > Sorry, I’m a new person in the APEX team. And I don't understand
> clearly
> > > who are consumers of the output port operator schema(s).
> > >
> > > 1. If the consumers are non-run-time callers like the application
> manager
> > > or UI designer, maybe it makes sense to use Java static method(s) to
> > > retrieve the output port operator schema(s). I guess the performance
> of a
> > > single call of a static method via reflection can be ignored.
> > >
> > > 2. If the consumer is next downstream operator, maybe it makes sense to
> > > send an output port operator schema from upstream operator to next
> > > downstream operator via the stream. The corresponded methods that would
> > > send and receive the schema should be declared in the
> > > interface/abstract-class of the upstream and downstream operators. The
> > > sending/receiving of an output schema should be processed right before
> > the
> > > sending of the first data record via the stream.
> > >
> > > One of examples of a typical implementation for sending of metadata
> with
> > a
> > > regular result set is the sending of JDBC metadata as a part of JDBC
> > result
> > > set. And I hope the output schema (metadata of the streamed data) in
> the
> > > implementation should contain not only a signature of the streamed
> > objects
> > > (like field names and data types), but also any other properties of the
> > > data that can be useful by the schema receiver to process the data (for
> > > instance, a delimiter for CSV record stream).
> > >
> > > Thanks,
> > > Sergey
> > >
> > > On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <
> chinmay@datatorrent.com>
> > > wrote:
> > > > Thank you all for the feedback.
> > > >
> > > > I've created a Jira for this: APEXCORE-623 and I'll attach the same
> > > > document and link to this mailchain there.
> > > >
> > > > As a first part of this Jira, there are 2 steps I would like to
> > propose:
> > > > 1. Add following interface at com.datatorrent.common.util.
> SchemaAware.
> > > >
> > > > interface SchemaAware {
> > > >
> > > > Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
> > > inputSchema);
> > > > }
> > > >
> > > > This interface can be implemented by Operators to communicate its
> > output
> > > > schema(s) to engine.
> > > > Input to this schema will be schema at its input port.
> > > >
> > > > 2. After LogicalPlan is created call SchemaAware method from upstream
> > to
> > > > downstream operator in the DAG to propagate the Schema.
> > > >
> > > > Once this is done, changes can be done in Malhar for the operators in
> > > > question.
> > > >
> > > > Please share your opinion on this approach.
> > > >
> > > > Thanks,
> > > > Chinmay.
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org>
> > > wrote:
> > > >
> > > > > +1 to have this feature.
> > > > >
> > > > > -Priyanka
> > > > >
> > > > > On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
> > > pramod@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
> > > chinmay@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Currently a DAG that is generated by user, if contains any
> > POJOfied
> > > > > > > operators, TUPLE_CLASS attribute needs to be set on each and
> > every
> > > port
> > > > > > > which receives or sends a POJO.
> > > > > > >
> > > > > > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
> > ->
> > > > > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set
> by
> > > user
> > > > > on
> > > > > > > both input and output ports of transform, dedup operators and
> > also
> > > on
> > > > > > > parser output and formatter input.
> > > > > > >
> > > > > > > The proposal here is to reduce work that is required by user to
> > > > > configure
> > > > > > > the DAG. Technically speaking if an operators knows input
> schema
> > > and
> > > > > > > processing properties, it can determine output schema and
> convey
> > > it to
> > > > > > > downstream operators. This way the complete pipeline can be
> > > configured
> > > > > > > without user setting TUPLE_CLASS or even creating POJOs and
> > adding
> > > them
> > > > > > to
> > > > > > > classpath.
> > > > > > >
> > > > > > > On the same idea, I want to propose an approach where the
> > pipeline
> > > can
> > > > > be
> > > > > > > configured without user setting TUPLE_CLASS or even creating
> > POJOs
> > > and
> > > > > > > adding them to classpath.
> > > > > > > Here is the document which at a high level explains the idea
> and
> > a
> > > high
> > > > > > > level design:
> > > > > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > > > > >
> > > > > > > I would like to get opinion from community about feasibility
> and
> > > > > > > applications of this proposal.
> > > > > > > Once we get some consensus we can discuss the design in
> details.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Chinmay.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Thomas Weise <th...@apache.org>.
I think dynamic schema would be good to consider (schema known and possibly
changing at runtime). Some applications cannot be written under the
assumption that the schema is known upfront.

Also, does this really need to leak into the engine? I think it would be
good to consider alternatives and tradeoffs.

Thomas


On Mon, Jan 30, 2017 at 10:44 PM, Chinmay Kolhatkar <chinmay@datatorrent.com
> wrote:

> Consumer of output port operator schema is going next downstream operator.
>
>
> On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <se...@datatorrent.com>
> wrote:
>
> > Sorry, I’m a new person in the APEX team. And I don't understand clearly
> > who are consumers of the output port operator schema(s).
> >
> > 1. If the consumers are non-run-time callers like the application manager
> > or UI designer, maybe it makes sense to use Java static method(s) to
> > retrieve the output port operator schema(s). I guess the performance of a
> > single call of a static method via reflection can be ignored.
> >
> > 2. If the consumer is next downstream operator, maybe it makes sense to
> > send an output port operator schema from upstream operator to next
> > downstream operator via the stream. The corresponded methods that would
> > send and receive the schema should be declared in the
> > interface/abstract-class of the upstream and downstream operators. The
> > sending/receiving of an output schema should be processed right before
> the
> > sending of the first data record via the stream.
> >
> > One of examples of a typical implementation for sending of metadata with
> a
> > regular result set is the sending of JDBC metadata as a part of JDBC
> result
> > set. And I hope the output schema (metadata of the streamed data) in the
> > implementation should contain not only a signature of the streamed
> objects
> > (like field names and data types), but also any other properties of the
> > data that can be useful by the schema receiver to process the data (for
> > instance, a delimiter for CSV record stream).
> >
> > Thanks,
> > Sergey
> >
> > On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <ch...@datatorrent.com>
> > wrote:
> > > Thank you all for the feedback.
> > >
> > > I've created a Jira for this: APEXCORE-623 and I'll attach the same
> > > document and link to this mailchain there.
> > >
> > > As a first part of this Jira, there are 2 steps I would like to
> propose:
> > > 1. Add following interface at com.datatorrent.common.util.SchemaAware.
> > >
> > > interface SchemaAware {
> > >
> > > Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
> > inputSchema);
> > > }
> > >
> > > This interface can be implemented by Operators to communicate its
> output
> > > schema(s) to engine.
> > > Input to this schema will be schema at its input port.
> > >
> > > 2. After LogicalPlan is created call SchemaAware method from upstream
> to
> > > downstream operator in the DAG to propagate the Schema.
> > >
> > > Once this is done, changes can be done in Malhar for the operators in
> > > question.
> > >
> > > Please share your opinion on this approach.
> > >
> > > Thanks,
> > > Chinmay.
> > >
> > >
> > >
> > >
> > > On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org>
> > wrote:
> > >
> > > > +1 to have this feature.
> > > >
> > > > -Priyanka
> > > >
> > > > On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
> > chinmay@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Currently a DAG that is generated by user, if contains any
> POJOfied
> > > > > > operators, TUPLE_CLASS attribute needs to be set on each and
> every
> > port
> > > > > > which receives or sends a POJO.
> > > > > >
> > > > > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup
> ->
> > > > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by
> > user
> > > > on
> > > > > > both input and output ports of transform, dedup operators and
> also
> > on
> > > > > > parser output and formatter input.
> > > > > >
> > > > > > The proposal here is to reduce work that is required by user to
> > > > configure
> > > > > > the DAG. Technically speaking if an operators knows input schema
> > and
> > > > > > processing properties, it can determine output schema and convey
> > it to
> > > > > > downstream operators. This way the complete pipeline can be
> > configured
> > > > > > without user setting TUPLE_CLASS or even creating POJOs and
> adding
> > them
> > > > > to
> > > > > > classpath.
> > > > > >
> > > > > > On the same idea, I want to propose an approach where the
> pipeline
> > can
> > > > be
> > > > > > configured without user setting TUPLE_CLASS or even creating
> POJOs
> > and
> > > > > > adding them to classpath.
> > > > > > Here is the document which at a high level explains the idea and
> a
> > high
> > > > > > level design:
> > > > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > > > >
> > > > > > I would like to get opinion from community about feasibility and
> > > > > > applications of this proposal.
> > > > > > Once we get some consensus we can discuss the design in details.
> > > > > >
> > > > > > Thanks,
> > > > > > Chinmay.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
Consumer of output port operator schema is going next downstream operator.


On Tue, Jan 31, 2017 at 4:01 AM, Sergey Golovko <se...@datatorrent.com>
wrote:

> Sorry, I’m a new person in the APEX team. And I don't understand clearly
> who are consumers of the output port operator schema(s).
>
> 1. If the consumers are non-run-time callers like the application manager
> or UI designer, maybe it makes sense to use Java static method(s) to
> retrieve the output port operator schema(s). I guess the performance of a
> single call of a static method via reflection can be ignored.
>
> 2. If the consumer is next downstream operator, maybe it makes sense to
> send an output port operator schema from upstream operator to next
> downstream operator via the stream. The corresponded methods that would
> send and receive the schema should be declared in the
> interface/abstract-class of the upstream and downstream operators. The
> sending/receiving of an output schema should be processed right before the
> sending of the first data record via the stream.
>
> One of examples of a typical implementation for sending of metadata with a
> regular result set is the sending of JDBC metadata as a part of JDBC result
> set. And I hope the output schema (metadata of the streamed data) in the
> implementation should contain not only a signature of the streamed objects
> (like field names and data types), but also any other properties of the
> data that can be useful by the schema receiver to process the data (for
> instance, a delimiter for CSV record stream).
>
> Thanks,
> Sergey
>
> On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <ch...@datatorrent.com>
> wrote:
> > Thank you all for the feedback.
> >
> > I've created a Jira for this: APEXCORE-623 and I'll attach the same
> > document and link to this mailchain there.
> >
> > As a first part of this Jira, there are 2 steps I would like to propose:
> > 1. Add following interface at com.datatorrent.common.util.SchemaAware.
> >
> > interface SchemaAware {
> >
> > Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema>
> inputSchema);
> > }
> >
> > This interface can be implemented by Operators to communicate its output
> > schema(s) to engine.
> > Input to this schema will be schema at its input port.
> >
> > 2. After LogicalPlan is created call SchemaAware method from upstream to
> > downstream operator in the DAG to propagate the Schema.
> >
> > Once this is done, changes can be done in Malhar for the operators in
> > question.
> >
> > Please share your opinion on this approach.
> >
> > Thanks,
> > Chinmay.
> >
> >
> >
> >
> > On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org>
> wrote:
> >
> > > +1 to have this feature.
> > >
> > > -Priyanka
> > >
> > > On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <
> pramod@datatorrent.com>
> > > wrote:
> > >
> > > > +1
> > > >
> > > > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <
> chinmay@apache.org>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Currently a DAG that is generated by user, if contains any POJOfied
> > > > > operators, TUPLE_CLASS attribute needs to be set on each and every
> port
> > > > > which receives or sends a POJO.
> > > > >
> > > > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by
> user
> > > on
> > > > > both input and output ports of transform, dedup operators and also
> on
> > > > > parser output and formatter input.
> > > > >
> > > > > The proposal here is to reduce work that is required by user to
> > > configure
> > > > > the DAG. Technically speaking if an operators knows input schema
> and
> > > > > processing properties, it can determine output schema and convey
> it to
> > > > > downstream operators. This way the complete pipeline can be
> configured
> > > > > without user setting TUPLE_CLASS or even creating POJOs and adding
> them
> > > > to
> > > > > classpath.
> > > > >
> > > > > On the same idea, I want to propose an approach where the pipeline
> can
> > > be
> > > > > configured without user setting TUPLE_CLASS or even creating POJOs
> and
> > > > > adding them to classpath.
> > > > > Here is the document which at a high level explains the idea and a
> high
> > > > > level design:
> > > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > > >
> > > > > I would like to get opinion from community about feasibility and
> > > > > applications of this proposal.
> > > > > Once we get some consensus we can discuss the design in details.
> > > > >
> > > > > Thanks,
> > > > > Chinmay.
> > > > >
> > > >
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Sergey Golovko <se...@datatorrent.com>.
Sorry, I\u2019m a new person in the APEX team. And I don't understand clearly who are consumers of the output port operator schema(s).

1. If the consumers are non-run-time callers like the application manager or UI designer, maybe it makes sense to use Java static method(s) to retrieve the output port operator schema(s). I guess the performance of a single call of a static method via reflection can be ignored.

2. If the consumer is next downstream operator, maybe it makes sense to send an output port operator schema from upstream operator to next downstream operator via the stream. The corresponded methods that would send and receive the schema should be declared in the interface/abstract-class of the upstream and downstream operators. The sending/receiving of an output schema should be processed right before the sending of the first data record via the stream.

One of examples of a typical implementation for sending of metadata with a regular result set is the sending of JDBC metadata as a part of JDBC result set. And I hope the output schema (metadata of the streamed data) in the implementation should contain not only a signature of the streamed objects (like field names and data types), but also any other properties of the data that can be useful by the schema receiver to process the data (for instance, a delimiter for CSV record stream).

Thanks,
Sergey

On 2017-01-25 01:47 (-0800), Chinmay Kolhatkar <ch...@datatorrent.com> wrote: 
> Thank you all for the feedback.
> 
> I've created a Jira for this: APEXCORE-623 and I'll attach the same
> document and link to this mailchain there.
> 
> As a first part of this Jira, there are 2 steps I would like to propose:
> 1. Add following interface at com.datatorrent.common.util.SchemaAware.
> 
> interface SchemaAware {
> 
> Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema> inputSchema);
> }
> 
> This interface can be implemented by Operators to communicate its output
> schema(s) to engine.
> Input to this schema will be schema at its input port.
> 
> 2. After LogicalPlan is created call SchemaAware method from upstream to
> downstream operator in the DAG to propagate the Schema.
> 
> Once this is done, changes can be done in Malhar for the operators in
> question.
> 
> Please share your opinion on this approach.
> 
> Thanks,
> Chinmay.
> 
> 
> 
> 
> On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org> wrote:
> 
> > +1 to have this feature.
> >
> > -Priyanka
> >
> > On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <pr...@datatorrent.com>
> > wrote:
> >
> > > +1
> > >
> > > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <ch...@apache.org>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > Currently a DAG that is generated by user, if contains any POJOfied
> > > > operators, TUPLE_CLASS attribute needs to be set on each and every port
> > > > which receives or sends a POJO.
> > > >
> > > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user
> > on
> > > > both input and output ports of transform, dedup operators and also on
> > > > parser output and formatter input.
> > > >
> > > > The proposal here is to reduce work that is required by user to
> > configure
> > > > the DAG. Technically speaking if an operators knows input schema and
> > > > processing properties, it can determine output schema and convey it to
> > > > downstream operators. This way the complete pipeline can be configured
> > > > without user setting TUPLE_CLASS or even creating POJOs and adding them
> > > to
> > > > classpath.
> > > >
> > > > On the same idea, I want to propose an approach where the pipeline can
> > be
> > > > configured without user setting TUPLE_CLASS or even creating POJOs and
> > > > adding them to classpath.
> > > > Here is the document which at a high level explains the idea and a high
> > > > level design:
> > > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > > >
> > > > I would like to get opinion from community about feasibility and
> > > > applications of this proposal.
> > > > Once we get some consensus we can discuss the design in details.
> > > >
> > > > Thanks,
> > > > Chinmay.
> > > >
> > >
> >
> 

Re: Schema Discovery Support in Apex Applications

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
Thank you all for the feedback.

I've created a Jira for this: APEXCORE-623 and I'll attach the same
document and link to this mailchain there.

As a first part of this Jira, there are 2 steps I would like to propose:
1. Add following interface at com.datatorrent.common.util.SchemaAware.

interface SchemaAware {

Map<OutputPort, Schema> registerSchema(Map<InputPort, Schema> inputSchema);
}

This interface can be implemented by Operators to communicate its output
schema(s) to engine.
Input to this schema will be schema at its input port.

2. After LogicalPlan is created call SchemaAware method from upstream to
downstream operator in the DAG to propagate the Schema.

Once this is done, changes can be done in Malhar for the operators in
question.

Please share your opinion on this approach.

Thanks,
Chinmay.




On Wed, Jan 18, 2017 at 2:31 PM, Priyanka Gugale <pr...@apache.org> wrote:

> +1 to have this feature.
>
> -Priyanka
>
> On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > +1
> >
> > On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <ch...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > Currently a DAG that is generated by user, if contains any POJOfied
> > > operators, TUPLE_CLASS attribute needs to be set on each and every port
> > > which receives or sends a POJO.
> > >
> > > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user
> on
> > > both input and output ports of transform, dedup operators and also on
> > > parser output and formatter input.
> > >
> > > The proposal here is to reduce work that is required by user to
> configure
> > > the DAG. Technically speaking if an operators knows input schema and
> > > processing properties, it can determine output schema and convey it to
> > > downstream operators. This way the complete pipeline can be configured
> > > without user setting TUPLE_CLASS or even creating POJOs and adding them
> > to
> > > classpath.
> > >
> > > On the same idea, I want to propose an approach where the pipeline can
> be
> > > configured without user setting TUPLE_CLASS or even creating POJOs and
> > > adding them to classpath.
> > > Here is the document which at a high level explains the idea and a high
> > > level design:
> > > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> > >
> > > I would like to get opinion from community about feasibility and
> > > applications of this proposal.
> > > Once we get some consensus we can discuss the design in details.
> > >
> > > Thanks,
> > > Chinmay.
> > >
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Priyanka Gugale <pr...@apache.org>.
+1 to have this feature.

-Priyanka

On Tue, Jan 17, 2017 at 9:18 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> +1
>
> On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <ch...@apache.org>
> wrote:
>
> > Hi All,
> >
> > Currently a DAG that is generated by user, if contains any POJOfied
> > operators, TUPLE_CLASS attribute needs to be set on each and every port
> > which receives or sends a POJO.
> >
> > For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> > Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user on
> > both input and output ports of transform, dedup operators and also on
> > parser output and formatter input.
> >
> > The proposal here is to reduce work that is required by user to configure
> > the DAG. Technically speaking if an operators knows input schema and
> > processing properties, it can determine output schema and convey it to
> > downstream operators. This way the complete pipeline can be configured
> > without user setting TUPLE_CLASS or even creating POJOs and adding them
> to
> > classpath.
> >
> > On the same idea, I want to propose an approach where the pipeline can be
> > configured without user setting TUPLE_CLASS or even creating POJOs and
> > adding them to classpath.
> > Here is the document which at a high level explains the idea and a high
> > level design:
> > https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> > tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
> >
> > I would like to get opinion from community about feasibility and
> > applications of this proposal.
> > Once we get some consensus we can discuss the design in details.
> >
> > Thanks,
> > Chinmay.
> >
>

Re: Schema Discovery Support in Apex Applications

Posted by Pramod Immaneni <pr...@datatorrent.com>.
+1

On Mon, Jan 16, 2017 at 1:23 AM, Chinmay Kolhatkar <ch...@apache.org>
wrote:

> Hi All,
>
> Currently a DAG that is generated by user, if contains any POJOfied
> operators, TUPLE_CLASS attribute needs to be set on each and every port
> which receives or sends a POJO.
>
> For e.g., if a DAG is like File -> Parser -> Transform -> Dedup ->
> Formatter -> Kafka, then TUPLE_CLASS attribute needs to be set by user on
> both input and output ports of transform, dedup operators and also on
> parser output and formatter input.
>
> The proposal here is to reduce work that is required by user to configure
> the DAG. Technically speaking if an operators knows input schema and
> processing properties, it can determine output schema and convey it to
> downstream operators. This way the complete pipeline can be configured
> without user setting TUPLE_CLASS or even creating POJOs and adding them to
> classpath.
>
> On the same idea, I want to propose an approach where the pipeline can be
> configured without user setting TUPLE_CLASS or even creating POJOs and
> adding them to classpath.
> Here is the document which at a high level explains the idea and a high
> level design:
> https://docs.google.com/document/d/1ibLQ1KYCLTeufG7dLoHyN_
> tRQXEM3LR-7o_S0z_porQ/edit?usp=sharing
>
> I would like to get opinion from community about feasibility and
> applications of this proposal.
> Once we get some consensus we can discuss the design in details.
>
> Thanks,
> Chinmay.
>