You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com> on 2015/05/12 02:58:58 UTC

Re: Review Request 33280: [SAMZA-561] Basic streaming SQL query planning support

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33280/#review83250
-----------------------------------------------------------


Looks good overall. Thanks for adding those end-to-end integration tests! It is great!


samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java
<https://reviews.apache.org/r/33280/#comment134168>

    In the new operator API, I merged the following two major changes:
    - The RelationOperator and TupleOperator into one single Operator interface and that will simplify the code here quite a bit w/:
    
    SimpleOperator operator = operatorFactory.getOperator(spec);
    
    - OperatorRouter is now a sub-class of Operator as well with simplified interface:
    
    router.addOperator(operator);
    
    The idea is model SimpleOperator as the basic operators and OperatorRouter as a composite operator that include a set of connected SimpleOperators. The only thing that is seemly different from the OperatorRouter and SimpleOperator is that SimpleOperator has a mandatory OperatorSpec and OperatorRouter does not (I am still debating on whether they should all have the OperatorSpec). What do you think from the parser/planner implementation point of view? Do the above changes make your code simplier?
    
    And one more: do you see a strong need to allow nested OperatorRouter in the query planner use cases? I did not add the support for that trying to a) keep the code simple; b) I don't see a strong use case in a Samza task that has to use multiple levels of nested OperatorRouter. Your opinion would be appreciate here.
    
    Thanks!



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java
<https://reviews.apache.org/r/33280/#comment134146>

    nit: Could you add a few lines of explanation on how those InputGetter and the correlates are used? It would be as good if just a link to Calcite javadoc is here.



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java
<https://reviews.apache.org/r/33280/#comment134197>

    nit: So, the build in expressions include unary and binary versions? Could you give some examples here?



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java
<https://reviews.apache.org/r/33280/#comment134212>

    doc: Could you give one example as how this rule is used?



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134217>

    How about array and map?



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134218>

    Do we really need Schema.Type.NULL?



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134221>

    Wouldn't this be an infinite recursion?



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134228>

    For Fixed and Enum, I don't see how we can convert back from RelDataType. May as well drop the support for these types.



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134229>

    Same here.



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java
<https://reviews.apache.org/r/33280/#comment134263>

    Don't have the enum support in generic Schema class either.



samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java
<https://reviews.apache.org/r/33280/#comment134269>

    I would prefer that the physical plan is generated in a config format and the StreamTask will be able to instantiate the OperatorRouter from a config. That way, the parser/planner can be only in the deployment console and does not need to be installed in each Samza container. But this could be later.



samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java
<https://reviews.apache.org/r/33280/#comment134270>

    Do we absolutely need it in samza-sql-core?



samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java
<https://reviews.apache.org/r/33280/#comment134273>

    We should be able to change getFields() to return List<Field>, since all current use cases of Schema.getFields() in samza-sql-core is to iterate through all fields.



samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java
<https://reviews.apache.org/r/33280/#comment134275>

    nit: this method does not seem to be used. Can we remove it?



samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java
<https://reviews.apache.org/r/33280/#comment134276>

    This should be:
    Schema type = getSchema(field.schema())



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java
<https://reviews.apache.org/r/33280/#comment134279>

    What about the operators w/ multiple input schemas and multiple output schemas?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java
<https://reviews.apache.org/r/33280/#comment134280>

    nit: just a style issue: we don't use assert except in test code.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java
<https://reviews.apache.org/r/33280/#comment134282>

    Should assemble a new intermediate tuple and send it downstream via collector.send(...)



samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java
<https://reviews.apache.org/r/33280/#comment134284>

    Question: isn't this operator just a FilterableStreamScanOp? There is no projection expression in the spec.



samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java
<https://reviews.apache.org/r/33280/#comment134288>

    Question: do we need to instantiate this factory class in Samza container? Or just needed for the front end Calcite parser/planner?


- Yi Pan (Data Infrastructure)


On May 6, 2015, 2:53 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33280/
> -----------------------------------------------------------
> 
> (Updated May 6, 2015, 2:53 p.m.)
> 
> 
> Review request for samza, Guozhang Wang and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-561
>     https://issues.apache.org/jira/browse/SAMZA-561
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This patch contains initial query execution planner implementation based on Apache Calcite. 
> 
> - Basic 'insert into' and 'where' clause support
> - Doesn't support projections, widnowing and aggregates. They will be added later.
> 
> 
> Diffs
> -----
> 
>   build.gradle a042567 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/Utils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java e1c22e9 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/FilterableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/RemoveIdentityProjectRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/ProjectableFilterableStreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/StreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java 705c0ff 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/RelDataTypeUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/SamzaStreamType.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/Stream.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java fd87aa5 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java 0bb15b2 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestRexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java fbb5c59 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Constants.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/OrderStreamTableFactory.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Field.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/Expression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java 96385e2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IntermediateMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 577cf74 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java aad18f4 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java 348fc0c 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java 916b166 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 93d4ebb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java 7412669 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/TestSqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/test/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/config/sql-filter.properties PRE-CREATION 
>   samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java PRE-CREATION 
>   samza-test/src/main/python/integration_tests.py df64e23 
>   samza-test/src/main/python/requirements.txt 2ae9590 
>   samza-test/src/main/python/tests/sql_tests.py PRE-CREATION 
>   samza-test/src/main/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/resources/orders.json PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33280/diff/
> 
> 
> Testing
> -------
> 
> * ./bin/check-all.sh passed.
> * Integration tests passed including new streaming sql integration test.
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Re: Review Request 33280: [SAMZA-561] Basic streaming SQL query planning support

Posted by Milinda Pathirage <mi...@apache.org>.

> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java, line 34
> > <https://reviews.apache.org/r/33280/diff/2/?file=934892#file934892line34>
> >
> >     What about the operators w/ multiple input schemas and multiple output schemas?

Will fix this.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java, line 152
> > <https://reviews.apache.org/r/33280/diff/2/?file=934887#file934887line152>
> >
> >     This should be:
> >     Schema type = getSchema(field.schema())

Will fix this.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java, line 46
> > <https://reviews.apache.org/r/33280/diff/2/?file=934893#file934893line46>
> >
> >     nit: just a style issue: we don't use assert except in test code.

will remove it.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java, line 34
> > <https://reviews.apache.org/r/33280/diff/2/?file=934897#file934897line34>
> >
> >     Question: isn't this operator just a FilterableStreamScanOp? There is no projection expression in the spec.

I was planning to add projections to this. But couldn't get the planner rule which does the merge working for all projection scenarios (rule worked when we have * but not for other cases). So I deffered adding project until I fixed the planner rule.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java, line 40
> > <https://reviews.apache.org/r/33280/diff/2/?file=934904#file934904line40>
> >
> >     Question: do we need to instantiate this factory class in Samza container? Or just needed for the front end Calcite parser/planner?

Only need in front-end during planning. Once we separate out planning and SamzaSqlTask we can get rid of this.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33280/#review83250
-----------------------------------------------------------


On May 6, 2015, 2:53 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33280/
> -----------------------------------------------------------
> 
> (Updated May 6, 2015, 2:53 p.m.)
> 
> 
> Review request for samza, Guozhang Wang and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-561
>     https://issues.apache.org/jira/browse/SAMZA-561
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This patch contains initial query execution planner implementation based on Apache Calcite. 
> 
> - Basic 'insert into' and 'where' clause support
> - Doesn't support projections, widnowing and aggregates. They will be added later.
> 
> 
> Diffs
> -----
> 
>   build.gradle a042567 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/Utils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java e1c22e9 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/FilterableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/RemoveIdentityProjectRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/ProjectableFilterableStreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/StreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java 705c0ff 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/RelDataTypeUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/SamzaStreamType.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/Stream.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java fd87aa5 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java 0bb15b2 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestRexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java fbb5c59 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Constants.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/OrderStreamTableFactory.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Field.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/Expression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java 96385e2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IntermediateMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 577cf74 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java aad18f4 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java 348fc0c 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java 916b166 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 93d4ebb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java 7412669 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/TestSqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/test/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/config/sql-filter.properties PRE-CREATION 
>   samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java PRE-CREATION 
>   samza-test/src/main/python/integration_tests.py df64e23 
>   samza-test/src/main/python/requirements.txt 2ae9590 
>   samza-test/src/main/python/tests/sql_tests.py PRE-CREATION 
>   samza-test/src/main/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/resources/orders.json PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33280/diff/
> 
> 
> Testing
> -------
> 
> * ./bin/check-all.sh passed.
> * Integration tests passed including new streaming sql integration test.
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Re: Review Request 33280: [SAMZA-561] Basic streaming SQL query planning support

Posted by Milinda Pathirage <mi...@apache.org>.

> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java, line 114
> > <https://reviews.apache.org/r/33280/diff/2/?file=934857#file934857line114>
> >
> >     In the new operator API, I merged the following two major changes:
> >     - The RelationOperator and TupleOperator into one single Operator interface and that will simplify the code here quite a bit w/:
> >     
> >     SimpleOperator operator = operatorFactory.getOperator(spec);
> >     
> >     - OperatorRouter is now a sub-class of Operator as well with simplified interface:
> >     
> >     router.addOperator(operator);
> >     
> >     The idea is model SimpleOperator as the basic operators and OperatorRouter as a composite operator that include a set of connected SimpleOperators. The only thing that is seemly different from the OperatorRouter and SimpleOperator is that SimpleOperator has a mandatory OperatorSpec and OperatorRouter does not (I am still debating on whether they should all have the OperatorSpec). What do you think from the parser/planner implementation point of view? Do the above changes make your code simplier?
> >     
> >     And one more: do you see a strong need to allow nested OperatorRouter in the query planner use cases? I did not add the support for that trying to a) keep the code simple; b) I don't see a strong use case in a Samza task that has to use multiple levels of nested OperatorRouter. Your opinion would be appreciate here.
> >     
> >     Thanks!

I am +1 for above changes (specially operator factory changes). OperatorRouter change also doesn't break anything in the Planner even though minor code change is required. 

Regarding nested OperatorRouter, I don't think we need nesting at this stage (May be never). We may need it if we plan to add intra-query parallelization. So I'm +1 for current code.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java, line 207
> > <https://reviews.apache.org/r/33280/diff/2/?file=934859#file934859line207>
> >
> >     nit: So, the build in expressions include unary and binary versions? Could you give some examples here?

Unary version ```Object execute(Object[] inputValues)``` is used for filter expressions. Binary version ```void execute(Object[] inputValues, Object[] results)``` is used for project expressions where there are multiple outputs for multiple inputs. A Row is represented by an array.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java, line 37
> > <https://reviews.apache.org/r/33280/diff/2/?file=934861#file934861line37>
> >
> >     doc: Could you give one example as how this rule is used?

This rule is used to merge a Project and a Scan together into a single operator (e.g. ProjectableStreamScan). This allows us to do the projecitons (simple projections such as renaming or casting) inside the scan if scan support projections (we can implement our scans to support projections).  But the current code doesn't use this. 

Calcite takes care of separating out simple projections and projecitons with aggregations.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java, line 50
> > <https://reviews.apache.org/r/33280/diff/2/?file=934866#file934866line50>
> >
> >     How about array and map?

This constraint is used to keep things simple at this stage. But we can add support for arrays and maps. I'll create a separate JIRA ticket to track this.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java, line 62
> > <https://reviews.apache.org/r/33280/diff/2/?file=934866#file934866line62>
> >
> >     Do we really need Schema.Type.NULL?

I'm not 100% sure why I add this. I'll revisit this.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java, line 102
> > <https://reviews.apache.org/r/33280/diff/2/?file=934866#file934866line102>
> >
> >     Wouldn't this be an infinite recursion?

Yep, this is a bug. It should be 

```
return relDataTypeFactory.createArrayType(convertFieldType(elementType.getElementType(), relDataTypeFactory), -1);
```


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java, line 110
> > <https://reviews.apache.org/r/33280/diff/2/?file=934866#file934866line110>
> >
> >     Same here.

Union can be used to support nullable columns. But I haven't implement this in relDataTypeToAvroType. I'll add that.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java, line 130
> > <https://reviews.apache.org/r/33280/diff/2/?file=934866#file934866line130>
> >
> >     Don't have the enum support in generic Schema class either.

May be we should drop support for Enums. I'm okay with that.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java, line 31
> > <https://reviews.apache.org/r/33280/diff/2/?file=934880#file934880line31>
> >
> >     Do we absolutely need it in samza-sql-core?

This is there because we don't have a metadata store implemented yet. We can get rid of this once we have a proper mechanism to handle metadata.


> On May 12, 2015, 12:58 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java, line 127
> > <https://reviews.apache.org/r/33280/diff/2/?file=934870#file934870line127>
> >
> >     I would prefer that the physical plan is generated in a config format and the StreamTask will be able to instantiate the OperatorRouter from a config. That way, the parser/planner can be only in the deployment console and does not need to be installed in each Samza container. But this could be later.

I aggree with you that we should get rid of the query in the task config. May be a metadata store is the way to go or I am fine with a config format too.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33280/#review83250
-----------------------------------------------------------


On May 6, 2015, 2:53 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33280/
> -----------------------------------------------------------
> 
> (Updated May 6, 2015, 2:53 p.m.)
> 
> 
> Review request for samza, Guozhang Wang and Yi Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-561
>     https://issues.apache.org/jira/browse/SAMZA-561
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This patch contains initial query execution planner implementation based on Apache Calcite. 
> 
> - Basic 'insert into' and 'where' clause support
> - Doesn't support projections, widnowing and aggregates. They will be added later.
> 
> 
> Diffs
> -----
> 
>   build.gradle a042567 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/Utils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java e1c22e9 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/FilterableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/RemoveIdentityProjectRule.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/ProjectableFilterableStreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/StreamScan.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java 705c0ff 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/RelDataTypeUtils.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/SamzaStreamType.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/Stream.java PRE-CREATION 
>   samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java fd87aa5 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestExecutionPlanner.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java 0bb15b2 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestRexToJavaCompiler.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java fbb5c59 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaUtils.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Constants.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/OrderStreamTableFactory.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Field.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/Expression.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java 96385e2 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IntermediateMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 577cf74 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java aad18f4 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java 348fc0c 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java 916b166 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 93d4ebb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java 7412669 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/TestSqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/test/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/config/sql-filter.properties PRE-CREATION 
>   samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java PRE-CREATION 
>   samza-test/src/main/python/integration_tests.py df64e23 
>   samza-test/src/main/python/requirements.txt 2ae9590 
>   samza-test/src/main/python/tests/sql_tests.py PRE-CREATION 
>   samza-test/src/main/resources/orders.avsc PRE-CREATION 
>   samza-test/src/main/resources/orders.json PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33280/diff/
> 
> 
> Testing
> -------
> 
> * ./bin/check-all.sh passed.
> * Integration tests passed including new streaming sql integration test.
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>