You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Jake Maes <ja...@gmail.com> on 2016/09/01 17:58:24 UTC

Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review147578
-----------------------------------------------------------




samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 47)
<https://reviews.apache.org/r/47835/#comment214770>

    Does MessageStream need to have the key type parameter?
    
    It seems like it would be more flexible to have ```MessageStream<M>``` where M could be a simple type like ```String``` in the case that you don't care about the keys, or ```Message<K,V>``` for the classic case where you do care. 
    
    We could start by only implementing the latter case, but at least MessageStream would be capable of the former case in the future.
    
    It seems that doing this would also eliminate the need for separate map(), flatMap(), filter(), etc. methods. A single apply() method could handle each of these cases. To me, this reads better. 
    ```
    MessageStream<K, M>.filter(new MyCustomFilter())
    ```
    becomes
    ```
    MessageStram<M>.apply(new MyCustomFilter())
    ```
    
    The former doesn't read like English. Could be slightly better if the name was "filterWith" but apply() still feels best.


- Jake Maes


On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2016, 8:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to SAMZA-914: https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 5d066c5867e9df9e94e60bde825dedf10703b399 
>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On Sept. 1, 2016, 5:58 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, line 47
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line47>
> >
> >     Does MessageStream need to have the key type parameter?
> >     
> >     It seems like it would be more flexible to have ```MessageStream<M>``` where M could be a simple type like ```String``` in the case that you don't care about the keys, or ```Message<K,V>``` for the classic case where you do care. 
> >     
> >     We could start by only implementing the latter case, but at least MessageStream would be capable of the former case in the future.
> >     
> >     It seems that doing this would also eliminate the need for separate map(), flatMap(), filter(), etc. methods. A single apply() method could handle each of these cases. To me, this reads better. 
> >     ```
> >     MessageStream<K, M>.filter(new MyCustomFilter())
> >     ```
> >     becomes
> >     ```
> >     MessageStram<M>.apply(new MyCustomFilter())
> >     ```
> >     
> >     The former doesn't read like English. Could be slightly better if the name was "filterWith" but apply() still feels best.

Let me think about it. I have tried either ways and there are needs to using both K, M as parameters to MessageStream, mainly for window and join functions, which needs to know the type of the key of input and output messages in aggregate and join functions and stores. If MessageStream only takes Message as the type parameter, there is no way to find out the key type in build time and use that to define the type parameters for window and join functions. Let's discuss in person tomorrow.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review147578
-----------------------------------------------------------


On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2016, 8:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to SAMZA-914: https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 5d066c5867e9df9e94e60bde825dedf10703b399 
>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On Sept. 1, 2016, 5:58 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, line 47
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line47>
> >
> >     Does MessageStream need to have the key type parameter?
> >     
> >     It seems like it would be more flexible to have ```MessageStream<M>``` where M could be a simple type like ```String``` in the case that you don't care about the keys, or ```Message<K,V>``` for the classic case where you do care. 
> >     
> >     We could start by only implementing the latter case, but at least MessageStream would be capable of the former case in the future.
> >     
> >     It seems that doing this would also eliminate the need for separate map(), flatMap(), filter(), etc. methods. A single apply() method could handle each of these cases. To me, this reads better. 
> >     ```
> >     MessageStream<K, M>.filter(new MyCustomFilter())
> >     ```
> >     becomes
> >     ```
> >     MessageStram<M>.apply(new MyCustomFilter())
> >     ```
> >     
> >     The former doesn't read like English. Could be slightly better if the name was "filterWith" but apply() still feels best.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Let me think about it. I have tried either ways and there are needs to using both K, M as parameters to MessageStream, mainly for window and join functions, which needs to know the type of the key of input and output messages in aggregate and join functions and stores. If MessageStream only takes Message as the type parameter, there is no way to find out the key type in build time and use that to define the type parameters for window and join functions. Let's discuss in person tomorrow.

Made the change s.t. MessageStream is back to the format of MessageStream<M extends Message>.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review147578
-----------------------------------------------------------


On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2016, 8:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to SAMZA-914: https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 5d066c5867e9df9e94e60bde825dedf10703b399 
>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java PRE-CREATION 
>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>