You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Siyuan Hua <si...@datatorrent.com> on 2016/04/30 02:43:55 UTC

Java Stream API Pull Request

Hi Community,

Happy Friday!
I just sent out an initial pull request for Java High Level Stream API.
This is the very first attempt to bring the functional paradigm into Apex
programming model. We will keep working on this for months, if you are
interested, please take time to have a look and make comments. Any
suggestions are welcome. Thanks!


For those who are not familiar with the idea, here is some write-up

First iteration of Java Stream API.

Java Stream API is following the popular functional programming paradigm to
construct an Apex Application.
The goal for this API is:

   - Easy to construct a dag
   - Easy to migrate other streaming application to Apex
   - Fully compatible with existing DAG API
   - Provide useful build-in transformations with abstracted pluggable
   components in one place

To achieve the goal and split the work, we categorize all different kind of
transformations into 2 different types:

   - 1 input, 1+ output (map, filter, flatmap);
   - Multiple input, 1 output (Aggregations, Joins, Unions)

This first iteration is only about the first category, which is, 1 in, 1+
out. For transformations like this, it is just like distributed function
call. So we abstract out some function types instead of operators.
Internally, there are some pre-build function operators which wrap the
function and connect together.

The core interface is the ApexStream. The ApexStream is designed in a
method chain fashion, which all transformation method returns a new
ApexStream object with new output type.

Here are some examples, if you want to do a filter then a map, you can do
stream.filter(new FilterFunction())
.map(new MapFunction()).
You can also mix this with existing operator API. For example, if you want
to add a operator after map, you can do this
stream.filter(..)
.map(..)
.addOperator(opt, opt.input, opt.output)
(ps: the opt.input here is to connect to the output of last stream and
opt.output is going to be connected to the next)
If you want to set the locality or attributes for operator/ports/dag, you
can use *with* clause, for example you want filter and map to be container
local and you want to set checkpoint window count for the new operator you
just added, you can do something like this
stream.filter(..)
.map(..).with(Locality.CONTAINER_LOCAL)
.addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5)
.with(someProp, someVal)(ps:engine will figure out which operator/ports/dag
this attribute applies to)`

Like the dag API, you can run the stream in a distributed mode or local
mode, For example,
stream...populateDag(dag) —distributed mode
stream...runLocally() —local mode

The stream is implemented in a lazy build mode, which means until you call
populateDag or run**, all the transformations and the order of them will be
kept in memory in a graph data structure (*DagMeta**). This will allow us
to solve some technical difficulties such as logical plan optimization etc.
Roadmap for next phase

Mainly for the 2nd category we mentioned above, to achieve that, we will
support the following features in the next iteration

   - Watermark - Ingestion time watermark / watermark from tuple
   - Early Triggers - How frequent to emit real-time partial result
   - Late Triggers - When to emit updated result with tuple comes after
   watermark
   - Spool state - When in memory aggregation run out of memory and how
   - 3 different aggregation recovery: ignore, accumulation, accumulation +
   delta
   - Window support, sliding window, moving window, session window base on
   3 different tuple time
   - 3 tuple time support: event time, system time, ingestion time

​

Re: Java Stream API Pull Request

Posted by Siyuan Hua <si...@datatorrent.com>.
Sorry, here is the link for the pull request
https://github.com/apache/incubator-apex-malhar/pull/261



On Fri, Apr 29, 2016 at 5:43 PM, Siyuan Hua <si...@datatorrent.com> wrote:

> Hi Community,
>
> Happy Friday!
> I just sent out an initial pull request for Java High Level Stream API.
> This is the very first attempt to bring the functional paradigm into Apex
> programming model. We will keep working on this for months, if you are
> interested, please take time to have a look and make comments. Any
> suggestions are welcome. Thanks!
>
>
> For those who are not familiar with the idea, here is some write-up
>
> First iteration of Java Stream API.
>
> Java Stream API is following the popular functional programming paradigm
> to construct an Apex Application.
> The goal for this API is:
>
>    - Easy to construct a dag
>    - Easy to migrate other streaming application to Apex
>    - Fully compatible with existing DAG API
>    - Provide useful build-in transformations with abstracted pluggable
>    components in one place
>
> To achieve the goal and split the work, we categorize all different kind
> of transformations into 2 different types:
>
>    - 1 input, 1+ output (map, filter, flatmap);
>    - Multiple input, 1 output (Aggregations, Joins, Unions)
>
> This first iteration is only about the first category, which is, 1 in, 1+
> out. For transformations like this, it is just like distributed function
> call. So we abstract out some function types instead of operators.
> Internally, there are some pre-build function operators which wrap the
> function and connect together.
>
> The core interface is the ApexStream. The ApexStream is designed in a
> method chain fashion, which all transformation method returns a new
> ApexStream object with new output type.
>
> Here are some examples, if you want to do a filter then a map, you can do
> stream.filter(new FilterFunction())
> .map(new MapFunction()).
> You can also mix this with existing operator API. For example, if you want
> to add a operator after map, you can do this
> stream.filter(..)
> .map(..)
> .addOperator(opt, opt.input, opt.output)
> (ps: the opt.input here is to connect to the output of last stream and
> opt.output is going to be connected to the next)
> If you want to set the locality or attributes for operator/ports/dag, you
> can use *with* clause, for example you want filter and map to be
> container local and you want to set checkpoint window count for the new
> operator you just added, you can do something like this
> stream.filter(..)
> .map(..).with(Locality.CONTAINER_LOCAL)
> .addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5)
> .with(someProp, someVal)(ps:engine will figure out which
> operator/ports/dag this attribute applies to)`
>
> Like the dag API, you can run the stream in a distributed mode or local
> mode, For example,
> stream...populateDag(dag) —distributed mode
> stream...runLocally() —local mode
>
> The stream is implemented in a lazy build mode, which means until you call
> populateDag or run**, all the transformations and the order of them will
> be kept in memory in a graph data structure (*DagMeta**). This will allow
> us to solve some technical difficulties such as logical plan optimization
> etc.
> Roadmap for next phase
>
> Mainly for the 2nd category we mentioned above, to achieve that, we will
> support the following features in the next iteration
>
>    - Watermark - Ingestion time watermark / watermark from tuple
>    - Early Triggers - How frequent to emit real-time partial result
>    - Late Triggers - When to emit updated result with tuple comes after
>    watermark
>    - Spool state - When in memory aggregation run out of memory and how
>    - 3 different aggregation recovery: ignore, accumulation, accumulation
>    + delta
>    - Window support, sliding window, moving window, session window base
>    on 3 different tuple time
>    - 3 tuple time support: event time, system time, ingestion time
>
> ​
>
>