You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Milinda Pathirage <mi...@apache.org> on 2015/09/03 17:54:00 UTC

Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > Hi, Milinda, sorry for the late review. I have put down my comments below. Overall, there are two things to be discussed:
> > 1) Adding OperatorBuilder interface as well. It serves two purposes:
> >    a) I remember that we have discussed the need for this due to the fact that in the parsing/planning phase, there are cases where the required parameters for the operator are not generated / finalized yet (hence you have added some setter functions in OperatorSpec as workaround). W/ OperatorBuilder, it is much easier that we just keep setting the parameters w/o calling build()
> >    b) In the user code directly using operator layer API, using OperatorBuilder can help to make the TopologyBuilder code more intuitive and helps to hide away all unnecessary specs s.t. intermediate stream/table names and/or operator names
> > 2) The implementation details of TopologyBuilder. I would prefer still keep a graph-based implementation of TopologyBuilder internally, instead of a stack-based implementation, due to the flexible representation the graph-based implementation is able to. At the API, we should first focus on DAG-like operators. However, I would prefer to keep the implementation flexible to avoid having to re-write the TopologyBuilder class later, when we need to support non-DAG-like operators. p.s. It would be good if you can modify the example tasks using the fluent-style APIs to illustrate how the user experience is. And w/ the help from OperatorBuilder, the TopologyBuilder implementaion can achieve this: if user does not specify the input/output streams/tables (like in DAG-like operators), TopologyBuilder should be able to figure out and generate the intermediate streams/tables names and connect the operators via those intermediate streams/tables. This is a step we must do anyways for DAG-like 
 operators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables are created as vertices in the graph and operators are now connected to those vertices if they consume from those streams/tables. This is a simple extension from the DAG model that does not need structure-change in the TopologyBuilder.
> > 
> > Just my two cents. Thanks!

Hi Yi,

I was trying to simplify the TopologyBuilder by using stack based approach because it can handle the logical algebra generated by Calcite. But I understand your concerns of having multiple operators reading from a single intermediate stream, etc. I'll do a another revision of the code with your suggestions.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, line 161
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line161>
> >
> >     My original intention to introduce the anonymous stream here is to represent the intermediate streams/tables. If we explicitly introduced the intermediate streams and tables in the following methods, I think that we can drop the anonymous ones.

I was not clear about anonymous stream, thats why I added intermediate stream method.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, line 174
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line174>
> >
> >     Could you elaborate more on what to be fixed here?

I was not sure whether using a UUID as intermediate stream name is the right approach. I prefer to have more meaningful names in case we decided to distribute operators accross multiple jobs connected through Kafka topics.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java, line 16
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041483#file1041483line16>
> >
> >     Question: I am not quite sure about why we need this. Is it simply a projection operator that directly send output to the system streams?

I was thinking in logical algebra terms. When there is a explicit insert in the query, calcite generates a LogicalInsert. I was trying to mirror that in our operator layer. But I started to working on re-writing planner I wrote previously. I read the Drill code and find out a better way to write the planner. I think I can get rid of need for explicit stream insert operator. But AFAIK, we will still need a operator to modify tables.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
>   samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>