You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com> on 2015/05/21 01:13:25 UTC

Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/
-----------------------------------------------------------

Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.


Bugs: SAMZA-552
    https://issues.apache.org/jira/browse/SAMZA-552


Repository: samza


Description
-------

SAMZA-552: added operator builder API
- The current operator builder only supports single output DAG topology yet


Diffs
-----

  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 

Diff: https://reviews.apache.org/r/34500/diff/


Testing
-------

./gradlew clean build passed


Thanks,

Yi Pan (Data Infrastructure)


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Milinda Pathirage <mi...@apache.org>.

> On Aug. 5, 2015, 5:34 p.m., Milinda Pathirage wrote:
> > I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java) to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts.
> > 
> > * I agree with Guozhang that we should first focus on simple use cases and I think we should not try to integrate support for building complex DAGs which contains multiple complex queries via this builder API.
> > * IMHO, TopologyBuilder is closer to query execution than to the query. And if we need people to compose SQL queries through a Java API, its better to have an API similar to jOOQ (http://www.jooq.org) for streaming SQL.
> > * AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query plans because SQL operators always has one output (@Yi please correct me if I am wrong).
> > * IMHO, supporting something similar to views through the builder API may be useful. We can allow to refer the result from builder (may be not through *build* method but via method like *buildView*) method as inputs to the other queries to facilitate this .
> > 
> > So I'm proposing builder similar to following based on Calcite's RelBuilder API:
> > 
> > ```java
> > TopologyBuilder builder = TopologyBuilder.create(..);
> > 
> > OperatorRouter router = builder.scan("stream1")
> >                           .window(10, 2)
> >                           .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
> >                           .scan("stream2")
> >                           .window(10, 2)
> >                           .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
> >                           .join(JoinType.INNER, builder.condition(...))
> >                           .scan("stream2")
> >                           .project(..)
> >                           .window(10, 2)
> >                           .join(joinType, condition)
> >                           .partition(partionKey, number)
> >                           .modify(Operation.INSERT, ..)
> > ```
> > 
> > * In above mentioned API, *beginStream* is renamed to *scan* to take to API closer to physical plan.
> > * *scan* in the middle means a start of a new input or input sub-query
> > * *join* takes last two sub-trees (sub-queries) as inputs
> > * *modify* is used to insert/update tuples to streams or tables
> > * Builder will provide utility methods to create conditions, function calls, aggregates and ```GROUP BY``` clauses.
> > * Above assumes that there is no multi-output operators.
> > * Reusable sub-queries are not present in the above example, I'll think about it and introduce a mechanism to re-use sub-queries (Possibly introducing the view concept)
> > 
> > Please feel free to comment on this.

Instead of group keys, aggregate calls or conditions we can directly take OperatorSpec instances, given that OperatorSpecs already encapsulate all the things necessary for an operator.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Milinda Pathirage <mi...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
-----------------------------------------------------------


I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java) to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts.

* I agree with Guozhang that we should first focus on simple use cases and I think we should not try to integrate support for building complex DAGs which contains multiple complex queries via this builder API.
* IMHO, TopologyBuilder is closer to query execution than to the query. And if we need people to compose SQL queries through a Java API, its better to have an API similar to jOOQ (http://www.jooq.org) for streaming SQL.
* AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query plans because SQL operators always has one output (@Yi please correct me if I am wrong).
* IMHO, supporting something similar to views through the builder API may be useful. We can allow to refer the result from builder (may be not through *build* method but via method like *buildView*) method as inputs to the other queries to facilitate this .

So I'm proposing builder similar to following based on Calcite's RelBuilder API:

```java
TopologyBuilder builder = TopologyBuilder.create(..);

OperatorRouter router = builder.scan("stream1")
                          .window(10, 2)
                          .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
                          .scan("stream2")
                          .window(10, 2)
                          .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
                          .join(JoinType.INNER, builder.condition(...))
                          .scan("stream2")
                          .project(..)
                          .window(10, 2)
                          .join(joinType, condition)
                          .partition(partionKey, number)
                          .modify(Operation.INSERT, ..)
```

* In above mentioned API, *beginStream* is renamed to *scan* to take to API closer to physical plan.
* *scan* in the middle means a start of a new input or input sub-query
* *join* takes last two sub-trees (sub-queries) as inputs
* *modify* is used to insert/update tuples to streams or tables
* Builder will provide utility methods to create conditions, function calls, aggregates and ```GROUP BY``` clauses.
* Above assumes that there is no multi-output operators.
* Reusable sub-queries are not present in the above example, I'll think about it and introduce a mechanism to re-use sub-queries (Possibly introducing the view concept)

Please feel free to comment on this.

- Milinda Pathirage


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On June 3, 2015, 11:44 p.m., Guozhang Wang wrote:
> > Since the operator topology will sit inside a single node, most of the time we should be facing some simple topology structures like window-window-join, but not as complicated as a complete workflow DAG. So I think we do not need to worry too much about those complex use cases, and in fact when that happens it should be compiled into multiple nodes as the resulted physical plan.
> > 
> > As for the API, I feel it would be better to have pre-defined add-operator functions in the build such as: aggregate(window, agg-func), join(window, window, join-condition) and groupBy(window, key), etc, compared with a general operator() function that takes the new operator object that users have to create by their own. In addition, the "bind" / "attach" function names for source / sink operators are not very intuitive to me (actually is there an example about how to use attach() for sink?), and we could possibly just drop them if we do with the route that we do not "canonicate" the operators but just use pre-defined functions for linear or tree topologies only.
> > 
> > In a word, I suggest that we focus on as-simple-as-possible APIs that may have some expressiveness constraints but enough for most of the use cases and not consider too much about API generalogy.
> > 
> > Some other minor thing:
> > 
> > 1. PartitionSpec / StreamSqlTask are empty files: is this intentional?

Thanks, Guozhang. Those are good points and are aligned with Navina/Milinda's comments. I will update the RB accordingly.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review86504
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review86504
-----------------------------------------------------------


Since the operator topology will sit inside a single node, most of the time we should be facing some simple topology structures like window-window-join, but not as complicated as a complete workflow DAG. So I think we do not need to worry too much about those complex use cases, and in fact when that happens it should be compiled into multiple nodes as the resulted physical plan.

As for the API, I feel it would be better to have pre-defined add-operator functions in the build such as: aggregate(window, agg-func), join(window, window, join-condition) and groupBy(window, key), etc, compared with a general operator() function that takes the new operator object that users have to create by their own. In addition, the "bind" / "attach" function names for source / sink operators are not very intuitive to me (actually is there an example about how to use attach() for sink?), and we could possibly just drop them if we do with the route that we do not "canonicate" the operators but just use pre-defined functions for linear or tree topologies only.

In a word, I suggest that we focus on as-simple-as-possible APIs that may have some expressiveness constraints but enough for most of the use cases and not consider too much about API generalogy.

Some other minor thing:

1. PartitionSpec / StreamSqlTask are empty files: is this intentional?

- Guozhang Wang


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On May 29, 2015, 2:29 a.m., Milinda Pathirage wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java, line 60
> > <https://reviews.apache.org/r/34500/diff/1/?file=965720#file965720line60>
> >
> >     It's better if we can discuss about the ordering guarantees of the returned iterator. Is this going preserve the ordering of operator DAG? What will happen in cases where there is a join operator with two inputs?

Good question. Right now, the operators are kept in a hash map and there is no order preserved in the iterator. Do you have a use case that need to traverse the operators based on the order of operator DAG? I did not think of it since I did not see a use case when implementing the physical operators.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85672
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Milinda Pathirage <mi...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85672
-----------------------------------------------------------



samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
<https://reviews.apache.org/r/34500/#comment137350>

    It's better if we can discuss about the ordering guarantees of the returned iterator. Is this going preserve the ordering of operator DAG? What will happen in cases where there is a join operator with two inputs?


- Milinda Pathirage


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java, line 26
> > <https://reviews.apache.org/r/34500/diff/1/?file=965722#file965722line26>
> >
> >     OperatorSource and OperatorSink have the same method signatures. Is that even allowed in Java? It's kind of confusing, even though the implementation can be semantically different.
> 
> Milinda Pathirage wrote:
>     This should be fine as long as there aren't any classes implementing both interfaces (http://stackoverflow.com/questions/2801878/implementing-two-interfaces-in-a-class-with-same-method-which-interface-method). May be we can change method naming.

Agreed. I will try to modify the class or method names here.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java, line 44
> > <https://reviews.apache.org/r/34500/diff/1/?file=965727#file965727line44>
> >
> >     Looks like you left behind some merge conflict statements in comments :)

Thanks for catching that. Will fix.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java, line 57
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line57>
> >
> >     Can you add info on what an unbound input or unbound output is? I think it will be useful to add/move the comment from Line 205 and Line 218 here.

Sure. Good point.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java, line 135
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line135>
> >
> >     Can we change this source() rather than stream() ?

My original intention is to model an intermediate stream in the topology. I will make it matching the return class name.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java, line 145
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line145>
> >
> >     Can you explain when sink should be used?

This is a further thought along the line that we may have an operator like split() that takes one input stream and have two output streams. Then, something like split().attach(sink1, sink2) would be handy. With the comments regarding to streamlining the operators' and hide the intermediate streams completely, I will need to think it over more.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel and possibly, make it hard for the user to understand the correct sequence of operators. 
> >     I am wondering if you think this kind of a model is possible. It would greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.
> 
> Milinda Pathirage wrote:
>     I also agree with Navina here. I think we should make building topologies simple with the builder API. One complexity of current OperatorSpec based API is you need to create intermediate streams (EntityName)s to wire operators together. I think we should try to hide that complexity through the builder API. Even though source and sink hides that complexity to some extent, its better if we can completely remove that.

Thank you both for the good points here. @Navina, yes, the basic idea for the topology builder is exactly what you mentioned and the model you illustrated is much simpler and very attractive. The issue I saw is that the topology may not be completely linear, or a tree. It is not easy to describe a network of operators like the following, w/o introducing the concept of intermediate streams.
   window-->aggregate--+---------------------+
   window-->aggregate----> join --+          |       +-->aggregate --+
                 project-->window--> join -->|split -+               |
                                             +-------->join ---------->join --> partition
There are three issues in the above example:
1. the join input may be intermediate streams, which essentially could be an output from a sub-topology
2. the multi-output operator will make the downstream expression branch off and not easily expressible in linear format
3. the output of a single operator maybe used by multiple downstream operators, again, forking off the linear expression

Maybe we can adopt the simple join builder as you illustrated for simple queries, although I think that I would like to add an OperatorBuilder as well here:
OperatorRouter simpleJoinQuery = TopologyBuilder.create()
                             .join(OperatorBuilder.window("stream1", 10), OperatorBuilder.window("stream2", 10), joinKeys)
                             .partition(partitionKey).build();

For more complex queries, I am thinking of the following method may work better:
OperatorRouter router = TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by", "treeId", "sum")
                          .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
                          .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
                          .partition(partitionKey, number,"outstream1").build();
                          
In which, beginStream() always signify one linear path of operators and add it to the topology. The following join operator will join the latest two streams and create one joined stream. This model may even solve the issue 2 by allowing: 
      ...split().beginStream(1).aggregate().beginStream(2)
        .beginStream("stream4").filter().window()
        .join().join().partition().build();
        
For issue 3, i.e. reuse a certain intermediate stream in multiple downstream operator, we can introduce beginReuseStream(source, "name") as the following:
OperatorRouter router = TopologyBuilder.create().beginReuseStream("stream1", "reuseStream1").window(10).aggregate("group-by", "treeId", "sum")
                          .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
                          .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
                          .join("reuseStream1", joinCondition)
                          .partition(partitionKey, number,"outstream1").build();
in which the downstream operator will refer to the name of the reuse stream to use the intermediate stream multiple times.

In addition, from review of SAMZA-561 changes, I think that we will need to include an OperatorBuilder interface as well, to help create operators easily:
e.g. OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). This can be used inside the TopologyBuilder, which allows us to hide all the intermediate inputs/outputs stream/entity names and there will be no OperatorSpec the programmer need to handle directly.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, line 123
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line123>
> >
> >     Why do we need 2 instances of the TopologyBuilder here?
> >     I think this occurs because stream() and sink() method return OperatorSource type rather than a TopologyBuilder instance. 
> >     
> >     How differently does the TopologyBuilder handle the OperatorSource and the OperatorSink ?

The current version of TopologyBuilder will copy all operators included in the source/sink when bind/attached. I will try to remove those w/ the model I described above.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Milinda Pathirage <mi...@apache.org>.

> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java, line 26
> > <https://reviews.apache.org/r/34500/diff/1/?file=965722#file965722line26>
> >
> >     OperatorSource and OperatorSink have the same method signatures. Is that even allowed in Java? It's kind of confusing, even though the implementation can be semantically different.

This should be fine as long as there aren't any classes implementing both interfaces (http://stackoverflow.com/questions/2801878/implementing-two-interfaces-in-a-class-with-same-method-which-interface-method). May be we can change method naming.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel and possibly, make it hard for the user to understand the correct sequence of operators. 
> >     I am wondering if you think this kind of a model is possible. It would greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.

I also agree with Navina here. I think we should make building topologies simple with the builder API. One complexity of current OperatorSpec based API is you need to create intermediate streams (EntityName)s to wire operators together. I think we should try to hide that complexity through the builder API. Even though source and sink hides that complexity to some extent, its better if we can completely remove that.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel and possibly, make it hard for the user to understand the correct sequence of operators. 
> >     I am wondering if you think this kind of a model is possible. It would greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.
> 
> Milinda Pathirage wrote:
>     I also agree with Navina here. I think we should make building topologies simple with the builder API. One complexity of current OperatorSpec based API is you need to create intermediate streams (EntityName)s to wire operators together. I think we should try to hide that complexity through the builder API. Even though source and sink hides that complexity to some extent, its better if we can completely remove that.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Thank you both for the good points here. @Navina, yes, the basic idea for the topology builder is exactly what you mentioned and the model you illustrated is much simpler and very attractive. The issue I saw is that the topology may not be completely linear, or a tree. It is not easy to describe a network of operators like the following, w/o introducing the concept of intermediate streams.
>        window-->aggregate--+---------------------+
>        window-->aggregate----> join --+          |       +-->aggregate --+
>                      project-->window--> join -->|split -+               |
>                                                  +-------->join ---------->join --> partition
>     There are three issues in the above example:
>     1. the join input may be intermediate streams, which essentially could be an output from a sub-topology
>     2. the multi-output operator will make the downstream expression branch off and not easily expressible in linear format
>     3. the output of a single operator maybe used by multiple downstream operators, again, forking off the linear expression
>     
>     Maybe we can adopt the simple join builder as you illustrated for simple queries, although I think that I would like to add an OperatorBuilder as well here:
>     OperatorRouter simpleJoinQuery = TopologyBuilder.create()
>                                  .join(OperatorBuilder.window("stream1", 10), OperatorBuilder.window("stream2", 10), joinKeys)
>                                  .partition(partitionKey).build();
>     
>     For more complex queries, I am thinking of the following method may work better:
>     OperatorRouter router = TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by", "treeId", "sum")
>                               .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
>                               .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .partition(partitionKey, number,"outstream1").build();
>                               
>     In which, beginStream() always signify one linear path of operators and add it to the topology. The following join operator will join the latest two streams and create one joined stream. This model may even solve the issue 2 by allowing: 
>           ...split().beginStream(1).aggregate().beginStream(2)
>             .beginStream("stream4").filter().window()
>             .join().join().partition().build();
>             
>     For issue 3, i.e. reuse a certain intermediate stream in multiple downstream operator, we can introduce beginReuseStream(source, "name") as the following:
>     OperatorRouter router = TopologyBuilder.create().beginReuseStream("stream1", "reuseStream1").window(10).aggregate("group-by", "treeId", "sum")
>                               .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
>                               .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .join("reuseStream1", joinCondition)
>                               .partition(partitionKey, number,"outstream1").build();
>     in which the downstream operator will refer to the name of the reuse stream to use the intermediate stream multiple times.
>     
>     In addition, from review of SAMZA-561 changes, I think that we will need to include an OperatorBuilder interface as well, to help create operators easily:
>     e.g. OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). This can be used inside the TopologyBuilder, which allows us to hide all the intermediate inputs/outputs stream/entity names and there will be no OperatorSpec the programmer need to handle directly.

For the last line, I meant /opId/-output-1


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, line 160
> > <https://reviews.apache.org/r/34500/diff/1/?file=965716#file965716line160>
> >
> >     How is this "anonymous" system used?

This will go away when we have the OperatorBuilder. Thanks!


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Navina Ramesh <nr...@linkedin.com>.

> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java, line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel and possibly, make it hard for the user to understand the correct sequence of operators. 
> >     I am wondering if you think this kind of a model is possible. It would greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.
> 
> Milinda Pathirage wrote:
>     I also agree with Navina here. I think we should make building topologies simple with the builder API. One complexity of current OperatorSpec based API is you need to create intermediate streams (EntityName)s to wire operators together. I think we should try to hide that complexity through the builder API. Even though source and sink hides that complexity to some extent, its better if we can completely remove that.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Thank you both for the good points here. @Navina, yes, the basic idea for the topology builder is exactly what you mentioned and the model you illustrated is much simpler and very attractive. The issue I saw is that the topology may not be completely linear, or a tree. It is not easy to describe a network of operators like the following, w/o introducing the concept of intermediate streams.
>        window-->aggregate--+---------------------+
>        window-->aggregate----> join --+          |       +-->aggregate --+
>                      project-->window--> join -->|split -+               |
>                                                  +-------->join ---------->join --> partition
>     There are three issues in the above example:
>     1. the join input may be intermediate streams, which essentially could be an output from a sub-topology
>     2. the multi-output operator will make the downstream expression branch off and not easily expressible in linear format
>     3. the output of a single operator maybe used by multiple downstream operators, again, forking off the linear expression
>     
>     Maybe we can adopt the simple join builder as you illustrated for simple queries, although I think that I would like to add an OperatorBuilder as well here:
>     OperatorRouter simpleJoinQuery = TopologyBuilder.create()
>                                  .join(OperatorBuilder.window("stream1", 10), OperatorBuilder.window("stream2", 10), joinKeys)
>                                  .partition(partitionKey).build();
>     
>     For more complex queries, I am thinking of the following method may work better:
>     OperatorRouter router = TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by", "treeId", "sum")
>                               .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
>                               .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .partition(partitionKey, number,"outstream1").build();
>                               
>     In which, beginStream() always signify one linear path of operators and add it to the topology. The following join operator will join the latest two streams and create one joined stream. This model may even solve the issue 2 by allowing: 
>           ...split().beginStream(1).aggregate().beginStream(2)
>             .beginStream("stream4").filter().window()
>             .join().join().partition().build();
>             
>     For issue 3, i.e. reuse a certain intermediate stream in multiple downstream operator, we can introduce beginReuseStream(source, "name") as the following:
>     OperatorRouter router = TopologyBuilder.create().beginReuseStream("stream1", "reuseStream1").window(10).aggregate("group-by", "treeId", "sum")
>                               .beginStream("stream2").window(10).aggregate("group-by","treeId", "avg").join(joinCondition)
>                               .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .join("reuseStream1", joinCondition)
>                               .partition(partitionKey, number,"outstream1").build();
>     in which the downstream operator will refer to the name of the reuse stream to use the intermediate stream multiple times.
>     
>     In addition, from review of SAMZA-561 changes, I think that we will need to include an OperatorBuilder interface as well, to help create operators easily:
>     e.g. OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). This can be used inside the TopologyBuilder, which allows us to hide all the intermediate inputs/outputs stream/entity names and there will be no OperatorSpec the programmer need to handle directly.
> 
> Yi Pan (Data Infrastructure) wrote:
>     For the last line, I meant /opId/-output-1

Summarizing my offline discussion with Yi:
* I think we are overlapping our requirements for operator builder API with the topology / workflow definition of a data processing pipeline. We should rename TopologyBuilder to QueryBuilder. 
* I agree with Milinda that we should simplify the API to hide out intermediate stream and spec details. I like the idea behind OperatorBuilder. Paves way for a cleaner API.
* Regarding non-linear workflow, I think we should not focus much on it. The idea is not to define arbitary workflows. It is more about defining sub-queries and connecting them together. If we think in those lines, it will be a much simpler api. We can extend to builders to support non-linear workflows.
* One more nit-pick: It will more intuitive to expose a "groupBy" interface rather than an "aggreagte"  interface with group-by key.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------



samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
<https://reviews.apache.org/r/34500/#comment137181>

    How is this "anonymous" system used?



samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
<https://reviews.apache.org/r/34500/#comment137270>

    OperatorSource and OperatorSink have the same method signatures. Is that even allowed in Java? It's kind of confusing, even though the implementation can be semantically different.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
<https://reviews.apache.org/r/34500/#comment137271>

    Same question as before. Which method gets invoked when I do a <OperatorTopology_object>.getName() ?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
<https://reviews.apache.org/r/34500/#comment137187>

    Looks like you left behind some merge conflict statements in comments :)



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
<https://reviews.apache.org/r/34500/#comment137325>

    Can you add info on what an unbound input or unbound output is? I think it will be useful to add/move the comment from Line 205 and Line 218 here.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
<https://reviews.apache.org/r/34500/#comment137339>

    Can we change this source() rather than stream() ?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
<https://reviews.apache.org/r/34500/#comment137338>

    Can you explain when sink should be used?



samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
<https://reviews.apache.org/r/34500/#comment137333>

    I thought TopologyBuilder was to abstract away the spec and provide a simplified API for a user implementing a simple SQL query. 
    Imo, this still seems pretty involved for a user concerned with just defining a simple join query. 
    
    I assumed we could have a builder pattern as below:
    
    ```
    TopologyBuilder builder = TopologyBuilder
                                .create()
                                .join(window("stream1", 10), window("stream2", 10), List{joinKey1, joinKey2, ...})
                                .partition(partitionKey)
    .build()
    ```
    
    The idea here is that the build statement order determines the topology. The builder just validates and chains them together. 
    I can see that this can be a problem with running operators in parallel and possibly, make it hard for the user to understand the correct sequence of operators. 
    I am wondering if you think this kind of a model is possible. It would greatly simplify the API for most users. 
    Just wanted to put this comment out so that we can discuss further.



samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
<https://reviews.apache.org/r/34500/#comment137337>

    Why do we need 2 instances of the TopologyBuilder here?
    I think this occurs because stream() and sink() method return OperatorSource type rather than a TopologyBuilder instance. 
    
    How differently does the TopologyBuilder handle the OperatorSource and the OperatorSink ?


- Navina Ramesh


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>