You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Milinda Pathirage <mi...@apache.org> on 2015/08/16 17:06:52 UTC

Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/
-----------------------------------------------------------

Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.


Bugs: SAMZA-552
    https://issues.apache.org/jira/browse/SAMZA-552


Repository: samza


Description
-------

New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).

* Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
* org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
* Window and aggregate related draft APIs are not done yet
* This is a WIP, please feel free to comment on the APIs


Diffs
-----

  samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java 705c0ffadd0982defae686bb0df178e9a7cbef8c 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc70179e74070e8feb617b6a7b3b62ef5c1156 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c7191aac8a738345c02b260baae811c5c5 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c363e723c20c30622d1253588f66166b206 

Diff: https://reviews.apache.org/r/37506/diff/


Testing
-------

./gradlew :samza-sql-core:test passed


Thanks,

Milinda Pathirage


Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

Posted by Milinda Pathirage <mi...@apache.org>.

> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java, line 89
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041501#file1041501line89>
> >
> >     The goal here is to use the topology builder to generate the query. Can you update the code here to use the topology builder?

Will update the code to use TopologyBuilder.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
>   samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

Posted by Milinda Pathirage <mi...@apache.org>.

> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > Hi, Milinda, sorry for the late review. I have put down my comments below. Overall, there are two things to be discussed:
> > 1) Adding OperatorBuilder interface as well. It serves two purposes:
> >    a) I remember that we have discussed the need for this due to the fact that in the parsing/planning phase, there are cases where the required parameters for the operator are not generated / finalized yet (hence you have added some setter functions in OperatorSpec as workaround). W/ OperatorBuilder, it is much easier that we just keep setting the parameters w/o calling build()
> >    b) In the user code directly using operator layer API, using OperatorBuilder can help to make the TopologyBuilder code more intuitive and helps to hide away all unnecessary specs s.t. intermediate stream/table names and/or operator names
> > 2) The implementation details of TopologyBuilder. I would prefer still keep a graph-based implementation of TopologyBuilder internally, instead of a stack-based implementation, due to the flexible representation the graph-based implementation is able to. At the API, we should first focus on DAG-like operators. However, I would prefer to keep the implementation flexible to avoid having to re-write the TopologyBuilder class later, when we need to support non-DAG-like operators. p.s. It would be good if you can modify the example tasks using the fluent-style APIs to illustrate how the user experience is. And w/ the help from OperatorBuilder, the TopologyBuilder implementaion can achieve this: if user does not specify the input/output streams/tables (like in DAG-like operators), TopologyBuilder should be able to figure out and generate the intermediate streams/tables names and connect the operators via those intermediate streams/tables. This is a step we must do anyways for DAG-like 
 operators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables are created as vertices in the graph and operators are now connected to those vertices if they consume from those streams/tables. This is a simple extension from the DAG model that does not need structure-change in the TopologyBuilder.
> > 
> > Just my two cents. Thanks!

Hi Yi,

I was trying to simplify the TopologyBuilder by using stack based approach because it can handle the logical algebra generated by Calcite. But I understand your concerns of having multiple operators reading from a single intermediate stream, etc. I'll do a another revision of the code with your suggestions.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, line 161
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line161>
> >
> >     My original intention to introduce the anonymous stream here is to represent the intermediate streams/tables. If we explicitly introduced the intermediate streams and tables in the following methods, I think that we can drop the anonymous ones.

I was not clear about anonymous stream, thats why I added intermediate stream method.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, line 174
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line174>
> >
> >     Could you elaborate more on what to be fixed here?

I was not sure whether using a UUID as intermediate stream name is the right approach. I prefer to have more meaningful names in case we decided to distribute operators accross multiple jobs connected through Kafka topics.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java, line 16
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041483#file1041483line16>
> >
> >     Question: I am not quite sure about why we need this. Is it simply a projection operator that directly send output to the system streams?

I was thinking in logical algebra terms. When there is a explicit insert in the query, calcite generates a LogicalInsert. I was trying to mirror that in our operator layer. But I started to working on re-writing planner I wrote previously. I read the Drill code and find out a better way to write the planner. I think I can get rid of need for explicit stream insert operator. But AFAIK, we will still need a operator to modify tables.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
>   samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


Hi, Milinda, sorry for the late review. I have put down my comments below. Overall, there are two things to be discussed:
1) Adding OperatorBuilder interface as well. It serves two purposes:
   a) I remember that we have discussed the need for this due to the fact that in the parsing/planning phase, there are cases where the required parameters for the operator are not generated / finalized yet (hence you have added some setter functions in OperatorSpec as workaround). W/ OperatorBuilder, it is much easier that we just keep setting the parameters w/o calling build()
   b) In the user code directly using operator layer API, using OperatorBuilder can help to make the TopologyBuilder code more intuitive and helps to hide away all unnecessary specs s.t. intermediate stream/table names and/or operator names
2) The implementation details of TopologyBuilder. I would prefer still keep a graph-based implementation of TopologyBuilder internally, instead of a stack-based implementation, due to the flexible representation the graph-based implementation is able to. At the API, we should first focus on DAG-like operators. However, I would prefer to keep the implementation flexible to avoid having to re-write the TopologyBuilder class later, when we need to support non-DAG-like operators. p.s. It would be good if you can modify the example tasks using the fluent-style APIs to illustrate how the user experience is. And w/ the help from OperatorBuilder, the TopologyBuilder implementaion can achieve this: if user does not specify the input/output streams/tables (like in DAG-like operators), TopologyBuilder should be able to figure out and generate the intermediate streams/tables names and connect the operators via those intermediate streams/tables. This is a step we must do anyways for DAG-like oper
 ators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables are created as vertices in the graph and operators are now connected to those vertices if they consume from those streams/tables. This is a simple extension from the DAG model that does not need structure-change in the TopologyBuilder.

Just my two cents. Thanks!


samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 161)
<https://reviews.apache.org/r/37506/#comment151391>

    My original intention to introduce the anonymous stream here is to represent the intermediate streams/tables. If we explicitly introduced the intermediate streams and tables in the following methods, I think that we can drop the anonymous ones.



samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 174)
<https://reviews.apache.org/r/37506/#comment151392>

    Could you elaborate more on what to be fixed here?



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java (line 28)
<https://reviews.apache.org/r/37506/#comment151393>

    Is this going to the interface exposed to users who are writing SQL tasks? It would be good to think of not using the generic Object class in the interface classes between the Samza framework vs user code, to follow the spirit in SAMZA-697.



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java (line 28)
<https://reviews.apache.org/r/37506/#comment151394>

    Same here.



samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java (line 19)
<https://reviews.apache.org/r/37506/#comment151395>

    I think that in the new TopologyBuilder + OperatorBuilder, there is a way to remove the OperatorSink and OperatorSource interfaces. The main purpose for those interfaces to exist is the requirement to refer to the partial topology that a) has one output; Or b) has one input that has not been bound to a system stream/table or an intermediate stream/table. I have thought about that if we follow an API similar to trident, any immediately connected operators won't require the sink/source interfaces, and any not-immediately connected operators will need to connect via a named intermediate stream/table. Hence, removing the need to create OperatorSink/OperatorSource classes.



samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 
<https://reviews.apache.org/r/37506/#comment151396>

    nit: I still think that a note here stressing the need to get the real "event time" instead of the message's receive time based on local system is important.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java (line 77)
<https://reviews.apache.org/r/37506/#comment151397>

    Why do we need this? I thought that we can directly produce to the system streams, w/ the isSystemStream flag in the EntityName class?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 100)
<https://reviews.apache.org/r/37506/#comment151398>

    nit: why don't we call it addOperator() directly?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 133)
<https://reviews.apache.org/r/37506/#comment151399>

    Me neither. I don't see the need to emit the table to a stream either.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 136)
<https://reviews.apache.org/r/37506/#comment151400>

    So, I assume that the stack is used as intermediate context for DAG computation? It works for computations like algebra. What I am worried about is that when the non-algebra types of operators (such as split operator in my previous examples, or in a case where one intermediate result is used by multiple downstream operators as input) are needed, this builder will need to be completely re-written, due to the strict stack-implementation that limits the types of computation it can support. I would prefer to have a generic implementation that can support more than DAG type of computation, but we can keep the API to look like fluent style for DAGs.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java (line 16)
<https://reviews.apache.org/r/37506/#comment151401>

    Question: I am not quite sure about why we need this. Is it simply a projection operator that directly send output to the system streams?



samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java (line 89)
<https://reviews.apache.org/r/37506/#comment151402>

    The goal here is to use the topology builder to generate the query. Can you update the code here to use the topology builder?



samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java (line 119)
<https://reviews.apache.org/r/37506/#comment151405>

    The previous discussion has led us to the point that we think that using OperatorBuilder seems to be easier here:
    this.simpleRtr = TopologyBuilder.create()
        .join(OperatorBuilder.window()
            .size(10).source("kafka:inputstream2")
            .setCallback(this.wndCallback),
          OperatorBuilder.window()
            .size(10).source("kafka:inputstream1")
            .setCallback(this.wndCallback),
          OperatorBuilder.join().setJoinFields(new ArrayList<String>() {{ add("key1"); add("key2");}})
        .partition(OperatorBuilder.partition()
            .setPartitionKey("joinKey")
            .setPartitionNum(50)
            .setOutput("kafka:parOutputStrm1"))
        .build()
    
    In which, all intermediate streams that are immediately consumed by the downstream operators are not named. Only the actual input/output streams are named. And OperatorBuilders are passed in as parameters to TopologyBuilder, s.t. intermediate stream/table names are generated and set to the OperatorBuilders within the Topology, w/o users to involved. Also, w/ the OperatorBuilder model, it would be easier to build a more flexible non-DAG topology later: users can name the operator's outputs s.t. it can be consumed by multiple downstream operators. I agree that it should not be the first priority to implement it. But it would be nice to keep the door open, instead of requiring re-implementing the TopologyBuilder layer later.


- Yi Pan (Data Infrastructure)


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
>   samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

Posted by Milinda Pathirage <mi...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/
-----------------------------------------------------------

(Updated Aug. 16, 2015, 3:57 p.m.)


Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.


Bugs: SAMZA-552
    https://issues.apache.org/jira/browse/SAMZA-552


Repository: samza


Description (updated)
-------

New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).

* Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
* org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
* Window and aggregate related draft APIs are not done yet
* This is a WIP, please feel free to comment on the APIs
* This contains Yi's changes from RB 34500


Diffs
-----

  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
  samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
  samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
  samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 

Diff: https://reviews.apache.org/r/37506/diff/


Testing
-------

./gradlew :samza-sql-core:test passed


Thanks,

Milinda Pathirage


Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API

Posted by Milinda Pathirage <mi...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/
-----------------------------------------------------------

(Updated Aug. 16, 2015, 3:56 p.m.)


Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.


Bugs: SAMZA-552
    https://issues.apache.org/jira/browse/SAMZA-552


Repository: samza


Description
-------

New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).

* Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
* org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API
* Window and aggregate related draft APIs are not done yet
* This is a WIP, please feel free to comment on the APIs


Diffs (updated)
-----

  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 
  samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 
  samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea 
  samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a 
  samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
  samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 

Diff: https://reviews.apache.org/r/37506/diff/


Testing
-------

./gradlew :samza-sql-core:test passed


Thanks,

Milinda Pathirage