You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Siyuan Hua <si...@datatorrent.com> on 2016/04/05 08:30:01 UTC

Java stream api

Hi community,

I have submitted my first commit of stream api into my public repository
here
https://github.com/siyuanh/incubator-apex-malhar/tree/stream

You can think this is the prototype of the Java Stream API proposal  here
https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4

A simple walkthrough of the code:

ApexStream is the core interface to build a dag in stream style. Default
implementation is in ApexStreamImpl

Function is a super interface for all simple transformation, it has several
sub interfaces like MapFunction, ReduceFunction etc.

FunctionOperator is a wrapper for functions that pass param from input port
to function and deliver the return value to output port.

And you can find the word count demo code below

https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java



As we want to release this API asap. We want the whole community to help
define a clear scope of what we want to achieve in the first cut. Any
suggestions, ideas are very welcome.

Please please do contribute to this :)

Thanks!
Siyuan

Re: Java stream api

Posted by Thomas Weise <th...@datatorrent.com>.
IMO completeness and stability cannot be goals for the first cut. It is
more about setting the direction and enabling others to contribute. We need
iterations and learn as we go.

Thomas

On Tue, Apr 12, 2016 at 10:13 AM, Siyuan Hua <si...@datatorrent.com> wrote:

> Thanks, Tushar and Thomas,
>
> Right now, we also need to think about the scope of the first cut and the
> next.
> My opinion is Stability and Completeness are 2 most important things for
> API design.
> In addition to taking your suggestions to existing code I have, I will also
> try to define a clear picture of what it will look like in the first cut
> and next hopefully
>
>
>
> On Tue, Apr 12, 2016 at 8:57 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > This looks really good. It will give Apex the "beginner API" to build
> apps.
> >
> > As already discussed, this API will be an opportunity to apply
> optimization
> > internally, so we should go with lazy DAG building approach. It is
> probably
> > a good idea to do this in the first cut so that other contributors will
> > follow the right pattern when adding more operations.
> >
> > It will also be helpful to provide an example how the API can be extended
> > or operations easily overridden with customized operator.
> >
> > It would be great if we can pull the initial rev into 3.4.0. This will
> open
> > it up for others contribution. It will be @Evolving, changes are
> expected.
> >
> > Thomas
> >
> >
> > On Tue, Apr 5, 2016 at 12:41 PM, Tushar Gosavi <tu...@datatorrent.com>
> > wrote:
> >
> > > On Tue, Apr 5, 2016 at 12:14 PM, Siyuan Hua <si...@datatorrent.com>
> > > wrote:
> > >
> > > > I have collected some open topics/questions for discussion already
> from
> > > > folks who already reviewed the code
> > > >
> > > > 1.The name, name of the Stream and the StreamSource
> > > >
> > > > 2. Build dag in an incremental way vs lazy population. Incremental
> way
> > is
> > > > easier to implement (what I did right now) and it create one edge for
> > the
> > > > dag for each transformation method. Lazy population means keep the
> > method
> > > > chain in memory until it needs to submit the dag either locally or to
> > the
> > > > cluster, in this way, some optimization(change order of transforation
> > > ex.)
> > > > might be done because you have an overall picture.
> > > >
> > > > +1 for lazy population. This way we could swap in better
> implementation
> > > for for combination of
> > > transformation.
> > >
> > >
> > >
> > > > 3. How to easily extend the Stream interface and it's implementation
> > > >
> > > > Can we add a factory using which we can generate a stream by adding
> > input
> > > operator. The factory would help us to change the implementation of
> > > operators for different purpose. Like one Factory can be for stream
> > > processing, other Factory can be for batch processing.
> > >
> > >
> > > > 4. How to deal with operator with multiple input ports/output ports.
> > >
> > >
> > >
> > > >
> > >
> > > One way to handle that would be to add a new abstraction StreamSet
> which
> > > also implements Stream, and provides an additional method get("name"),
> > When
> > > you add a operator it creates a streamSet, by default it will use first
> > > port of the operator for stream.  but user can get a required stream by
> > > calling get. For example suppose if parser has two ports one of them is
> > > error, we can get an error stream and start a different stream from it.
> > >
> > > stream.addOperator("1", new ParseOperator() { }
> > ).get("error").map("map1",
> > > new MapFunction());
> > >
> > > I can't think of any solution now to support multiple input ports.
> > >
> > >
> > > > Again, I appreciate any ideas and suggestion for those questions
> above.
> > > >
> > > > And feel free to ask more questions you have
> > > >
> > > > Regards,
> > > > Siyuan
> > > >
> > > > On Mon, Apr 4, 2016 at 11:30 PM, Siyuan Hua <si...@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi community,
> > > > >
> > > > > I have submitted my first commit of stream api into my public
> > > repository
> > > > > here
> > > > > https://github.com/siyuanh/incubator-apex-malhar/tree/stream
> > > > >
> > > > > You can think this is the prototype of the Java Stream API proposal
> > > here
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4
> > > > >
> > > > > A simple walkthrough of the code:
> > > > >
> > > > > ApexStream is the core interface to build a dag in stream style.
> > > Default
> > > > > implementation is in ApexStreamImpl
> > > > >
> > > > > Function is a super interface for all simple transformation, it has
> > > > > several sub interfaces like MapFunction, ReduceFunction etc.
> > > > >
> > > > > FunctionOperator is a wrapper for functions that pass param from
> > input
> > > > > port to function and deliver the return value to output port.
> > > > >
> > > > > And you can find the word count demo code below
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java
> > > > >
> > > > >
> > > > >
> > > > > As we want to release this API asap. We want the whole community to
> > > help
> > > > > define a clear scope of what we want to achieve in the first cut.
> Any
> > > > > suggestions, ideas are very welcome.
> > > > >
> > > > > Please please do contribute to this :)
> > > > >
> > > > > Thanks!
> > > > > Siyuan
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Java stream api

Posted by Siyuan Hua <si...@datatorrent.com>.
Thanks, Tushar and Thomas,

Right now, we also need to think about the scope of the first cut and the
next.
My opinion is Stability and Completeness are 2 most important things for
API design.
In addition to taking your suggestions to existing code I have, I will also
try to define a clear picture of what it will look like in the first cut
and next hopefully



On Tue, Apr 12, 2016 at 8:57 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> This looks really good. It will give Apex the "beginner API" to build apps.
>
> As already discussed, this API will be an opportunity to apply optimization
> internally, so we should go with lazy DAG building approach. It is probably
> a good idea to do this in the first cut so that other contributors will
> follow the right pattern when adding more operations.
>
> It will also be helpful to provide an example how the API can be extended
> or operations easily overridden with customized operator.
>
> It would be great if we can pull the initial rev into 3.4.0. This will open
> it up for others contribution. It will be @Evolving, changes are expected.
>
> Thomas
>
>
> On Tue, Apr 5, 2016 at 12:41 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > On Tue, Apr 5, 2016 at 12:14 PM, Siyuan Hua <si...@datatorrent.com>
> > wrote:
> >
> > > I have collected some open topics/questions for discussion already from
> > > folks who already reviewed the code
> > >
> > > 1.The name, name of the Stream and the StreamSource
> > >
> > > 2. Build dag in an incremental way vs lazy population. Incremental way
> is
> > > easier to implement (what I did right now) and it create one edge for
> the
> > > dag for each transformation method. Lazy population means keep the
> method
> > > chain in memory until it needs to submit the dag either locally or to
> the
> > > cluster, in this way, some optimization(change order of transforation
> > ex.)
> > > might be done because you have an overall picture.
> > >
> > > +1 for lazy population. This way we could swap in better implementation
> > for for combination of
> > transformation.
> >
> >
> >
> > > 3. How to easily extend the Stream interface and it's implementation
> > >
> > > Can we add a factory using which we can generate a stream by adding
> input
> > operator. The factory would help us to change the implementation of
> > operators for different purpose. Like one Factory can be for stream
> > processing, other Factory can be for batch processing.
> >
> >
> > > 4. How to deal with operator with multiple input ports/output ports.
> >
> >
> >
> > >
> >
> > One way to handle that would be to add a new abstraction StreamSet which
> > also implements Stream, and provides an additional method get("name"),
> When
> > you add a operator it creates a streamSet, by default it will use first
> > port of the operator for stream.  but user can get a required stream by
> > calling get. For example suppose if parser has two ports one of them is
> > error, we can get an error stream and start a different stream from it.
> >
> > stream.addOperator("1", new ParseOperator() { }
> ).get("error").map("map1",
> > new MapFunction());
> >
> > I can't think of any solution now to support multiple input ports.
> >
> >
> > > Again, I appreciate any ideas and suggestion for those questions above.
> > >
> > > And feel free to ask more questions you have
> > >
> > > Regards,
> > > Siyuan
> > >
> > > On Mon, Apr 4, 2016 at 11:30 PM, Siyuan Hua <si...@datatorrent.com>
> > > wrote:
> > >
> > > > Hi community,
> > > >
> > > > I have submitted my first commit of stream api into my public
> > repository
> > > > here
> > > > https://github.com/siyuanh/incubator-apex-malhar/tree/stream
> > > >
> > > > You can think this is the prototype of the Java Stream API proposal
> > here
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4
> > > >
> > > > A simple walkthrough of the code:
> > > >
> > > > ApexStream is the core interface to build a dag in stream style.
> > Default
> > > > implementation is in ApexStreamImpl
> > > >
> > > > Function is a super interface for all simple transformation, it has
> > > > several sub interfaces like MapFunction, ReduceFunction etc.
> > > >
> > > > FunctionOperator is a wrapper for functions that pass param from
> input
> > > > port to function and deliver the return value to output port.
> > > >
> > > > And you can find the word count demo code below
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java
> > > >
> > > >
> > > >
> > > > As we want to release this API asap. We want the whole community to
> > help
> > > > define a clear scope of what we want to achieve in the first cut. Any
> > > > suggestions, ideas are very welcome.
> > > >
> > > > Please please do contribute to this :)
> > > >
> > > > Thanks!
> > > > Siyuan
> > > >
> > > >
> > > >
> > >
> >
>

Re: Java stream api

Posted by Thomas Weise <th...@datatorrent.com>.
This looks really good. It will give Apex the "beginner API" to build apps.

As already discussed, this API will be an opportunity to apply optimization
internally, so we should go with lazy DAG building approach. It is probably
a good idea to do this in the first cut so that other contributors will
follow the right pattern when adding more operations.

It will also be helpful to provide an example how the API can be extended
or operations easily overridden with customized operator.

It would be great if we can pull the initial rev into 3.4.0. This will open
it up for others contribution. It will be @Evolving, changes are expected.

Thomas


On Tue, Apr 5, 2016 at 12:41 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> On Tue, Apr 5, 2016 at 12:14 PM, Siyuan Hua <si...@datatorrent.com>
> wrote:
>
> > I have collected some open topics/questions for discussion already from
> > folks who already reviewed the code
> >
> > 1.The name, name of the Stream and the StreamSource
> >
> > 2. Build dag in an incremental way vs lazy population. Incremental way is
> > easier to implement (what I did right now) and it create one edge for the
> > dag for each transformation method. Lazy population means keep the method
> > chain in memory until it needs to submit the dag either locally or to the
> > cluster, in this way, some optimization(change order of transforation
> ex.)
> > might be done because you have an overall picture.
> >
> > +1 for lazy population. This way we could swap in better implementation
> for for combination of
> transformation.
>
>
>
> > 3. How to easily extend the Stream interface and it's implementation
> >
> > Can we add a factory using which we can generate a stream by adding input
> operator. The factory would help us to change the implementation of
> operators for different purpose. Like one Factory can be for stream
> processing, other Factory can be for batch processing.
>
>
> > 4. How to deal with operator with multiple input ports/output ports.
>
>
>
> >
>
> One way to handle that would be to add a new abstraction StreamSet which
> also implements Stream, and provides an additional method get("name"), When
> you add a operator it creates a streamSet, by default it will use first
> port of the operator for stream.  but user can get a required stream by
> calling get. For example suppose if parser has two ports one of them is
> error, we can get an error stream and start a different stream from it.
>
> stream.addOperator("1", new ParseOperator() { } ).get("error").map("map1",
> new MapFunction());
>
> I can't think of any solution now to support multiple input ports.
>
>
> > Again, I appreciate any ideas and suggestion for those questions above.
> >
> > And feel free to ask more questions you have
> >
> > Regards,
> > Siyuan
> >
> > On Mon, Apr 4, 2016 at 11:30 PM, Siyuan Hua <si...@datatorrent.com>
> > wrote:
> >
> > > Hi community,
> > >
> > > I have submitted my first commit of stream api into my public
> repository
> > > here
> > > https://github.com/siyuanh/incubator-apex-malhar/tree/stream
> > >
> > > You can think this is the prototype of the Java Stream API proposal
> here
> > >
> > >
> >
> https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4
> > >
> > > A simple walkthrough of the code:
> > >
> > > ApexStream is the core interface to build a dag in stream style.
> Default
> > > implementation is in ApexStreamImpl
> > >
> > > Function is a super interface for all simple transformation, it has
> > > several sub interfaces like MapFunction, ReduceFunction etc.
> > >
> > > FunctionOperator is a wrapper for functions that pass param from input
> > > port to function and deliver the return value to output port.
> > >
> > > And you can find the word count demo code below
> > >
> > >
> > >
> >
> https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java
> > >
> > >
> > >
> > > As we want to release this API asap. We want the whole community to
> help
> > > define a clear scope of what we want to achieve in the first cut. Any
> > > suggestions, ideas are very welcome.
> > >
> > > Please please do contribute to this :)
> > >
> > > Thanks!
> > > Siyuan
> > >
> > >
> > >
> >
>

Re: Java stream api

Posted by Tushar Gosavi <tu...@datatorrent.com>.
On Tue, Apr 5, 2016 at 12:14 PM, Siyuan Hua <si...@datatorrent.com> wrote:

> I have collected some open topics/questions for discussion already from
> folks who already reviewed the code
>
> 1.The name, name of the Stream and the StreamSource
>
> 2. Build dag in an incremental way vs lazy population. Incremental way is
> easier to implement (what I did right now) and it create one edge for the
> dag for each transformation method. Lazy population means keep the method
> chain in memory until it needs to submit the dag either locally or to the
> cluster, in this way, some optimization(change order of transforation ex.)
> might be done because you have an overall picture.
>
> +1 for lazy population. This way we could swap in better implementation
for for combination of
transformation.



> 3. How to easily extend the Stream interface and it's implementation
>
> Can we add a factory using which we can generate a stream by adding input
operator. The factory would help us to change the implementation of
operators for different purpose. Like one Factory can be for stream
processing, other Factory can be for batch processing.


> 4. How to deal with operator with multiple input ports/output ports.



>

One way to handle that would be to add a new abstraction StreamSet which
also implements Stream, and provides an additional method get("name"), When
you add a operator it creates a streamSet, by default it will use first
port of the operator for stream.  but user can get a required stream by
calling get. For example suppose if parser has two ports one of them is
error, we can get an error stream and start a different stream from it.

stream.addOperator("1", new ParseOperator() { } ).get("error").map("map1",
new MapFunction());

I can't think of any solution now to support multiple input ports.


> Again, I appreciate any ideas and suggestion for those questions above.
>
> And feel free to ask more questions you have
>
> Regards,
> Siyuan
>
> On Mon, Apr 4, 2016 at 11:30 PM, Siyuan Hua <si...@datatorrent.com>
> wrote:
>
> > Hi community,
> >
> > I have submitted my first commit of stream api into my public repository
> > here
> > https://github.com/siyuanh/incubator-apex-malhar/tree/stream
> >
> > You can think this is the prototype of the Java Stream API proposal  here
> >
> >
> https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4
> >
> > A simple walkthrough of the code:
> >
> > ApexStream is the core interface to build a dag in stream style. Default
> > implementation is in ApexStreamImpl
> >
> > Function is a super interface for all simple transformation, it has
> > several sub interfaces like MapFunction, ReduceFunction etc.
> >
> > FunctionOperator is a wrapper for functions that pass param from input
> > port to function and deliver the return value to output port.
> >
> > And you can find the word count demo code below
> >
> >
> >
> https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java
> >
> >
> >
> > As we want to release this API asap. We want the whole community to help
> > define a clear scope of what we want to achieve in the first cut. Any
> > suggestions, ideas are very welcome.
> >
> > Please please do contribute to this :)
> >
> > Thanks!
> > Siyuan
> >
> >
> >
>

Re: Java stream api

Posted by Siyuan Hua <si...@datatorrent.com>.
I have collected some open topics/questions for discussion already from
folks who already reviewed the code

1.The name, name of the Stream and the StreamSource

2. Build dag in an incremental way vs lazy population. Incremental way is
easier to implement (what I did right now) and it create one edge for the
dag for each transformation method. Lazy population means keep the method
chain in memory until it needs to submit the dag either locally or to the
cluster, in this way, some optimization(change order of transforation ex.)
might be done because you have an overall picture.

3. How to easily extend the Stream interface and it's implementation

4. How to deal with operator with multiple input ports/output ports.

Again, I appreciate any ideas and suggestion for those questions above.

And feel free to ask more questions you have

Regards,
Siyuan

On Mon, Apr 4, 2016 at 11:30 PM, Siyuan Hua <si...@datatorrent.com> wrote:

> Hi community,
>
> I have submitted my first commit of stream api into my public repository
> here
> https://github.com/siyuanh/incubator-apex-malhar/tree/stream
>
> You can think this is the prototype of the Java Stream API proposal  here
>
> https://docs.google.com/document/d/163LmQjX860b61NDe3ZzR0hRTPtE-4GF0iHaVhmHQssY/edit#heading=h.aytn6rz7u1e4
>
> A simple walkthrough of the code:
>
> ApexStream is the core interface to build a dag in stream style. Default
> implementation is in ApexStreamImpl
>
> Function is a super interface for all simple transformation, it has
> several sub interfaces like MapFunction, ReduceFunction etc.
>
> FunctionOperator is a wrapper for functions that pass param from input
> port to function and deliver the return value to output port.
>
> And you can find the word count demo code below
>
>
> https://github.com/siyuanh/incubator-apex-malhar/blob/stream/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithStreamAPI.java
>
>
>
> As we want to release this API asap. We want the whole community to help
> define a clear scope of what we want to achieve in the first cut. Any
> suggestions, ideas are very welcome.
>
> Please please do contribute to this :)
>
> Thanks!
> Siyuan
>
>
>