You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Ananth G <an...@gmail.com> on 2017/08/15 21:02:23 UTC

[Design discussion] - Kudu Input operator

Hello All,

The implementation for Apex Kudu Input Operator is ready for a pull request. Before raising the pull request, I would like to get any inputs regarding the design and incorporate any feedback before raising the pull request in the next couple of days for the following JIRA.

https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472>

The following are the main features that would be supported by the Input operator:

- The input operator would be used to scan all or some rows of a single kudu table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail below) that would be parsed to generate the equivalent scanner code for the Kudu Table. This is because Kudu Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache Apex design patterns ( Ex: Sending a control tuple message after a query is successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This essentially means for a single input query, the scan work is distributed among all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets are replicated and partitioned  (range and hash partitions are supported ) in Kudu according to the Kudu Table definition. The operator allows partitioning of the Input Operator to be done in 2 ways. 
	- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
        - One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the complex use cases that would arise. For example, if the query is touching only a few rows before the next query is accepted, it would result in a lot of churn in terms of operator serialize/deserialze, YARN allocation requests etc. Also supporting per query partition planning leads to possibly very complex implementation and poor resource usage as all physical instances of the operator have to wait for its peers to complete its scan and wait for next checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin fashion. After a query plan is generated , each scan token range is distributed equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be changed on a per query basis ) 
	- Consistent Order scanner - only one tablet scan thread is active at any given instance of time for a given query
        - Random Order scanner - Many threads are active to scan Kudu tablets in parallel
- As can be seen, Consistent order scanner would be slower but would help in better “exactly once” implementations if the correct method is overridden in the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer management. LMAX disruptor library was considered and based on some other discussion threads on other Apache projects, settled on the ConversantMedia implementation of the Disruptor Blocking queue. This blocking queue is used when the kudu scanner thread wants to send the scanned row into the input operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the logic for reconciling a possible duplicate row in situations when the operator is resuming from a checkpoint. This is done by overriding a method that returns a boolean ( true to emit the tuple and false to suppress the tuple ) when the operating is working in the reconciling window phase. As can be seen, this reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every window. From resumption at a checkpoint, the operator will still scan the Kudu tablets but simply not emit all rows that were already streamed downstream. Subsequently when the operator is in the reconciling window, the method described above is invoked to allow for duplicates filter. After this reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
	- Max tuples per window
        - Spin policy and the buffer size for the Disruptor Blocking Queue
        - Mechanism to provide custom control tuples if required
	- Setting the number of Physical operator instances via the API if required. 
        - Setting the fault Tolerance. If fault tolerant , an alternative replica of the Kudu tablet is picked up for scanning if the initial tablet fails for whatever reason. However this slows down the scan throughput. Hence it is configurable by the end user.


Some notes regarding the SQL expression for the operator:

- The operator uses ANTLR4 to parse the SQL expression.
- The parser is based on a grammar file which is part of the source tree. The grammar is compiled on every build as part of the build process and code is generated for the parser automatically.
- The reason we had to use a custom parser are (as opposed to something like calcite) :
	- Kudu does not have all the features for a standard SQL expression. As an example != ( not equal to ) is not supported. Nor is there a concept of a Join etc.
	- We are providing a lot more flexibility for the user to specify what the control tuple message should be should the end user choose to send a control tuple downstream after the given query is done processing
- The SQL expression can specify a set of options for processing of the query:
	- Control tuple message : A message/string that can be sent as the Control tuple field. There would be other parts for this control tuple like the query that was just completed and whether this is a begin or end of the scan.
        - Read Snapshot time : Kudu supports specifying the read snapshot time for which the scan has to occur. This is because Kudu is essentially an MVCC engine and stores multiple versions of the same row. The Read snapshot time allows for the end user to specify the read snapshot time for the scan. 
- The parser supports for general syntax checking. If there is an error in the SQL expression , the string representing the SQL expression supplied is emitted onto an error port and the next query is taken for processing.
- The data types supported are only those data types as supported by the Kudu Engine. The parser supports data type parsing support. For example String data types are double quoted etc. 
- The Parser allows for a SELECT AA as BB style of expressions wherein AA is the column name in Kudu and BB is the name of the java POJO field name.

Please let me know if the community has any other questions regarding the above design. I am planning to present this operator along with the Kudu output operator in the Data works summit next month and any feedback would be useful.


Regards,
Ananth 

Re: [Design discussion] - Kudu Input operator

Posted by Ananth G <an...@gmail.com>.
Hello Thomas,

Thanks for the additional comments. Replies inline marked [Ananth]>>>

Regards,
Ananth
> On 22 Aug 2017, at 2:10 pm, Thomas Weise <th...@apache.org> wrote:
> 
> -->
> 
> Thanks,
> Thomas
> 
> 
> On Sat, Aug 19, 2017 at 2:07 PM, Ananth G <ananthg.apex@gmail.com <ma...@gmail.com>> wrote:
> 
>> Hello Thomas,
>> 
>> Replies in line marked [Ananth]>>
>> 
>> Apologies for a little bit more longer description as I think the
>> description needs more clarity.
>> 
>> Regards,
>> Ananth
>> 
>>> On 19 Aug 2017, at 11:10 am, Thomas Weise <th...@apache.org> wrote:
>>> 
>>> Hi Ananth,
>>> 
>>> Nice writeup, couple questions/comments inline ->
>>> 
>>> On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.apex@gmail.com
>> <mailto:ananthg.apex@gmail.com <ma...@gmail.com>>> wrote:
>>> 
>>>> Hello All,
>>>> 
>>>> The implementation for Apex Kudu Input Operator is ready for a pull
>>>> request. Before raising the pull request, I would like to get any inputs
>>>> regarding the design and incorporate any feedback before raising the
>> pull
>>>> request in the next couple of days for the following JIRA.
>>>> 
>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472> <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472>> <
>>>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
>>>> 
>>>> The following are the main features that would be supported by the Input
>>>> operator:
>>>> 
>>>> - The input operator would be used to scan all or some rows of a single
>>>> kudu table.
>>>> - Each Kudu row is translated to a POJO for downstream operators.
>>>> - The Input operator would accept an SQL expression ( described in
>> detail
>>>> below) that would be parsed to generate the equivalent scanner code for
>> the
>>>> Kudu Table. This is because Kudu Table API does not support an SQL
>>>> expressions
>>>> - The SQL expression would have additional options that would help in
>>>> Apache Apex design patterns ( Ex: Sending a control tuple message after
>> a
>>>> query is successfully processed )
>>>> - The Input operator works on a continuous basis i.e. it would accept
>> the
>>>> next query once the current query is complete)
>>>> 
>>> 
>>> This means the operator will repeat the query to fetch newly added rows,
>>> similar to what the JDBC poll operator does, correct?
>> [Ananth]>> Yes.  All of this design is covered by the Abstract
>> implementation. In fact there is a default implementation of the abstract
>> operator that does exactly this.This default implementation operator is
>> called IncrementalStepScanInputOperator. This operator based on a
>> properties file can be used to implement the JDBC Poll operator
>> functionality using a timestamp column as the incremental step value.
>> 
>> The design however does not limit us to only this pattern but can
>> accomodate other patterns as well. Here is what I want to add in this
>> context:
>>    - Additional pattern can include a “time travel pattern”. Since Kudu
>> is an MVCC engine ( and if appropriately configured ) , we can use this
>> operator to answer question like “ Can I stream the entire or subset of the
>> kudu table at times 1 AM , 2 AM , 3 AM ..“ Of today even though the current
>> time could be 6 P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME
>> which is a supported option of the SQL grammar we are enabling for this
>> operator )
>> 
> 
> So this could be used to do event time based processing based on the
> snapshot time (without a timestamp column)?
> 

 [Ananth]>>> Yes. That is correct. 

> 
>>    - Another interesting pattern is when the next query has got no
>> correlation with a previous query . Example use cases can be say using
>> Apex-cli equivalent or more possible future use case like Apache Zeppelin
>> integration. A query comes in ad-hoc and the values can be streamed from
>> the current incoming expression i.e. when we want to enable interactive
>> query based streaming.
>> 
>>> 
>>> - The operator will work in a distributed fashion for the input query.
>> This
>>>> essentially means for a single input query, the scan work is distributed
>>>> among all of the physical instances of the input operator.
>>>> - Kudu splits a table into chunks of data regions called Tablets. The
>>>> tablets are replicated and partitioned  (range and hash partitions are
>>>> supported ) in Kudu according to the Kudu Table definition. The operator
>>>> allows partitioning of the Input Operator to be done in 2 ways.
>>>>       - Map many Kudu Tablets to one partition of the Apex Kudu Input
>>>> operator
>>>>       - One Kudu Tablet maps to one partition of the Apex Kudu Input
>>>> operator
>>>> - The partitioning does not change on a per query basis. This is because
>>>> of the complex use cases that would arise. For example, if the query is
>>>> touching only a few rows before the next query is accepted, it would
>> result
>>>> in a lot of churn in terms of operator serialize/deserialze, YARN
>>>> allocation requests etc. Also supporting per query partition planning
>> leads
>>>> to possibly very complex implementation and poor resource usage as all
>>>> physical instances of the operator have to wait for its peers to
>> complete
>>>> its scan and wait for next checkpoint to get repartitioned.
>>>> 
>>> 
>>> Agreed, what would be a reason to change partitioning between queries
>>> though?
>>> 
>> [Ananth]>> Was making that note more in the context of Dynamic
>> partitioning. My current understanding is that dynamic partitioning is
>> entirely based on the performance stats and I was thinking that did not
>> make exact sense when the stats need to be entirely dependent on the
>> business logic ( query in this case ) and not the operational aspects that
>> is being supported by the StatsListener interface. So the Abstract operator
>> suppresses any dynamic partitioning as the stats would severely change
>> basing on the query and data distribution of the underlying kudu cluster
>> and kudu table definition.
>> 
> 
> Dynamic partitioning can be based on any trigger, including external
> information such as queue size or number of files in a directory. Some
> implementations that you see in the code base trigger partition change
> based on internal metrics, you can implement your custom logic in the
> StatsListener.
> 
 [Ananth]>>> Thanks for the clarification. 

> 
>> 
>>> 
>>>> - The partitioner splits the work load of a single query in a round
>> robin
>>>> fashion. After a query plan is generated , each scan token range is
>>>> distributed equally among the physical operator instances.
>>>> - The operator allows for two modes of scanning for an application (
>>>> Cannot be changed on a per query basis )
>>>>       - Consistent Order scanner - only one tablet scan thread is
>> active
>>>> at any given instance of time for a given query
>>>>       - Random Order scanner - Many threads are active to scan Kudu
>>>> tablets in parallel
>>>> - As can be seen, Consistent order scanner would be slower but would
>> help
>>>> in better “exactly once” implementations if the correct method is
>>>> overridden in the operator.
>>>> 
>>> 
>>> Can you elaborate on this a bit more? Ordering within a streaming window
>>> generally isn't deterministic when you have a shuffle or stream merge.
>> And
>>> the association between input records and streaming windows can be made
>>> deterministic by using the window data manager?
>>> 
>> [Ananth]>>  The consistent order scanner essentially does two things :
>> ensures that there is only single thread per apex operator for scanning a
>> kudu tablet and also marking it fault tolerant from Kudu Tablet failures. I
>> was more referring to the issue that this deterministic behaviour is not
>> guaranteed at the Kudu tablets level itself when multiple tablets are
>> mapped to a single apex partition. In this case, when there are multiple
>> threads scanning multiple kudu tablets and contributing to the buffer of a
>> single partition of the Apex operator. If however the user configures Apex
>> operator as one to one mapping or uses the consistent order scanner, we can
>> at least guarantee that the same ordering can happen just for the input
>> tuples provided underlying kudu table is not mutated for that query result.
>> 
>> Yes you are right that we cannot claim deterministic ordering in case
>> there is a shuffle or merge in the downstream operators. Also there is no
>> shuffle or merge within the Kudu Input operator itself. The query planner
>> ensures that the predicates are pushed down to the Kudu scan engine and the
>> results are streamed to the individual partitions. If the user chooses to
>> unify Input operator instances due to application design needs, then
>> ordering is definitely lost.
>> 
>> The window manager comment is entirely for the Input operator state
>> management and I did not mean that we are providing for end to end
>> functionality with this input operator. This is the reason why we have a
>> method called is “isAllowedInReconcilingWindow” as  an overridable method
>> as only a business logic can tell in case of resumption from a checkpoint
>> how to decide if we need to reprocess a tuple. Comparing it to the Kafka
>> implementation, Kafka commits the offsets at the committed call whereas I
>> could not take that approach because of the following reasons:
>> 
>> - All of the physical instances of the Kudu input operator might not be
>> processing the same query at any given instance of time ! This is because
>> as soon as the physical instance of the operator is done processing with
>> the current query, it is asking for its next work order. Since the data
>> distribution of the underlying kudu cluster decides the amount of time a
>> physical instance of the Apex Kudu input operator spends on working on a
>> work order, each operator is at varying processing query windows.
>> - As long as the the concrete implementation of the abstract input
>> operator provides for the same sequence of the input queries/work orders,
>> all of the physical instances should eventually process all of the data in
>> some future time.
>> - The Kafka operator had the benefit of doing this at committed call
>> because it is based on offset and the work order/kafka partition does not
>> change
>> - In the case of Kudu input operator, it is entirely possible that there
>> are multiple different queries processed and completed between calls for
>> checkpoint and hence I thought it was sensible to only guarantee a state at
>> the checkpoint() call rather than committed call.
>> 
> 
> That's correct and when you look at state management in other places, you
> see that committed merely picks up state that was previously guaranteed
> through checkpointed() for things such as finalizing/materializing files,
> because that state becomes immutable only once the committed notification
> arrives.
> 
> There are however multiple streaming windows between checkpoints, and when
> those streaming windows are not deterministic, then rewind/replay can
> produce different results in the topology (for example for an ingress time
> based aggregation).
> 
 [Ananth]>>> I have been thinking about this problem for a while and makes me wonder if we are staring at a distributed consensus pattern here. Do you think there is a value add in on-boarding a consensus aware operator as a construct in the operator framework  ? What I mean is all/some operators of an apex application agree at setup() time to work upon a consensus approach to take some actions. The consensus in this case is to “discard” all tuples that were committed by all “interested” operators. I feel the architecture of Apex fits beautifully as opposed to its peer streaming frameworks which do not allow “ a customisable” construct like an operator. I can start a separate discussion thread describing this more as a proposal using RAFT as a possible consensus approach. Thoughts ? 

> 
>> - The Abstract input operator automatically filters all the data till the
>> last window but one before the shutdown crash. These windows are being
>> termed as “safe mode” in the code.
>> - In the window that was active when shutdown/crash happened that needs
>> more  stronger check ( called the reconciling window), the operator chooses
>> to give flexibility for the user to decide whether we need to “re-stream” a
>> tuple by the method “isAllowedInReconcilingWindow”.
>> 
>> 
>>> 
>>>> - The operator introduces the DisruptorBlockingQueue for a low latency
>>>> buffer management. LMAX disruptor library was considered and based on
>> some
>>>> other discussion threads on other Apache projects, settled on the
>>>> ConversantMedia implementation of the Disruptor Blocking queue. This
>>>> blocking queue is used when the kudu scanner thread wants to send the
>>>> scanned row into the input operators main thread emitTuples() call.
>>>> - The operator allows for exactly once semantics if the user specifies
>> the
>>>> logic for reconciling a possible duplicate row in situations when the
>>>> operator is resuming from a checkpoint. This is done by overriding a
>> method
>>>> that returns a boolean ( true to emit the tuple and false to suppress
>> the
>>>> tuple ) when the operating is working in the reconciling window phase.
>> As
>>>> can be seen, this reconciling phase is only active at the max for one
>>>> window.
>>> 
>>> - The operator uses the FSWindowManager to manage metadata at the end of
>>>> every window. From resumption at a checkpoint, the operator will still
>> scan
>>>> the Kudu tablets but simply not emit all rows that were already streamed
>>>> downstream. Subsequently when the operator is in the reconciling window,
>>>> the method described above is invoked to allow for duplicates filter.
>> After
>>>> this reconciling window, the operator works in the normal mode of
>> operation.
>>>> 
>>> 
>>> In which situation would I use this? How do I know the record was
>> actually
>>> processed downstream?
>> 
>> [Ananth]>> I guess this is covered in the response which I just added
>> above. Since there is no “good way” to decide in a truely distributed
>> application, the flexibility is given to the end user to decide. The input
>> operator only provides a guarantee that we will not duplicate rows in a
>> best possible effort. On the other hand , there is the other use case if
>> committed window is called. We might end up sending in more duplicates
>> downstream in case of a crash and possibly miss some queries as well as
>> each operator is at different queries at the time of the commit call.
>> 
>> 
> The JMS input operator is an example where replay from source is not
> possible. In that case, the operator itself needs to remember the tuples
> that were emitted in the streaming windows until they are committed. Can a
> similar approach not be used here?
> 
> 
 [Ananth]>>> Yes. This can be done but I am not sure we can have all the guarantees we would like it to have. For example crash of the input operator will make some of the assumptions void and will result in a duplicate emit if there is no business logic to validate a duplicate entry etc. ( which means that we cannot do away with the method check for strict Exactly once implementations). Also I have consciously included the ConversantMedia Disruptor Queue as a highly performant ArrayBlockingQueue equivalent which automatically  discards the tuple once it is read from the holding buffer in the emitTuples() call. 

>>> 
>>> 
>>>> - The following are the additional configurable aspects of the operator
>>>>       - Max tuples per window
>>>>       - Spin policy and the buffer size for the Disruptor Blocking
>> Queue
>>>>       - Mechanism to provide custom control tuples if required
>>>>       - Setting the number of Physical operator instances via the API
>> if
>>>> required.
>>>>       - Setting the fault Tolerance. If fault tolerant , an alternative
>>>> replica of the Kudu tablet is picked up for scanning if the initial
>> tablet
>>>> fails for whatever reason. However this slows down the scan throughput.
>>>> Hence it is configurable by the end user.
>>>> 
>>>> 
>>>> Some notes regarding the SQL expression for the operator:
>>>> 
>>>> - The operator uses ANTLR4 to parse the SQL expression.
>>>> - The parser is based on a grammar file which is part of the source
>> tree.
>>>> The grammar is compiled on every build as part of the build process and
>>>> code is generated for the parser automatically.
>>>> - The reason we had to use a custom parser are (as opposed to something
>>>> like calcite) :
>>>>       - Kudu does not have all the features for a standard SQL
>>>> expression. As an example != ( not equal to ) is not supported. Nor is
>>>> there a concept of a Join etc.
>>>>       - We are providing a lot more flexibility for the user to specify
>>>> what the control tuple message should be should the end user choose to
>> send
>>>> a control tuple downstream after the given query is done processing
>>>> - The SQL expression can specify a set of options for processing of the
>>>> query:
>>>>       - Control tuple message : A message/string that can be sent as
>> the
>>>> Control tuple field. There would be other parts for this control tuple
>> like
>>>> the query that was just completed and whether this is a begin or end of
>> the
>>>> scan.
>>>> 
>>> 
>>> Will there be any support for watermarks that would work with downstream
>>> window operator? Another future idea might be to support Kudu as a source
>>> in the Apex SQL API.
>>> 
>> [Ananth]>> Currently there is a control tuple that is emitted at the end
>> of the query if enabled in the SQL expression. Apart from this, the
>> operator also allows for sending a control tuple at the beginning and
>> ending of the query for a given physical instance of the operator should
>> the user choose to. The control tuple itself is entirely extensible and is
>> hence given as a templated variable to the Abstract operator. This
>> essentially means that watermarks are supported by default if the user
>> wants to send a custom marker? However it may be noted that this control
>> tuple is can only be emitted at the beginning and end of the query. If you
>> are referring to watermark for the data like event time based watermarks, I
>> will need to explore a bit more into this and will perhaps need some
>> enhancement
>> 
>> I mean the latter but that is more for my information or in case it
> affects the design and not intended to become part of the current work.
> 
> 
>> Support for APEX SQL API is a great idea. I will need to do some more
>> homework here as I have to see how the custom parser and the Calcite
>> integration needs to happen as I believe we are using the Calcite parsers
>> to enable some of the Streaming API ? Kudu drivers do not yet mention of
>> any JDBC based driver and need to see how well we can use Calcite in the
>> right way.
>> 
>> 
> Again, treat this as outside of the current scope. It is not directly
> related to JDBC. Have a look at the existing endpoints for the SQL API:
> 
> https://github.com/apache/apex-malhar/tree/master/sql/src/main/java/org/apache/apex/malhar/sql/table <https://github.com/apache/apex-malhar/tree/master/sql/src/main/java/org/apache/apex/malhar/sql/table>
> 
> In the fusion style app you could use the Kudu input operator, but to write
> a pipeline in pure SQL with Kudu source, there would need to be a Kudu
> endpoint. An interesting aspect would be that with Kudu (same would apply
> to JDBC source) there is the potential for predicate pushdown.
> 
> 

 [Ananth]>>> Thanks for the pointer. I will take this up as an enhancement as part of this JIRA https://issues.apache.org/jira/browse/APEXMALHAR-2538 <https://issues.apache.org/jira/browse/APEXMALHAR-2538>

> 
>>> 
>>>>       - Read Snapshot time : Kudu supports specifying the read snapshot
>>>> time for which the scan has to occur. This is because Kudu is
>> essentially
>>>> an MVCC engine and stores multiple versions of the same row. The Read
>>>> snapshot time allows for the end user to specify the read snapshot time
>> for
>>>> the scan.
>>>> - The parser supports for general syntax checking. If there is an error
>> in
>>>> the SQL expression , the string representing the SQL expression
>> supplied is
>>>> emitted onto an error port and the next query is taken for processing.
>>>> - The data types supported are only those data types as supported by the
>>>> Kudu Engine. The parser supports data type parsing support. For example
>>>> String data types are double quoted etc.
>>>> - The Parser allows for a SELECT AA as BB style of expressions wherein
>> AA
>>>> is the column name in Kudu and BB is the name of the java POJO field
>> name.
>>>> 
>>>> Please let me know if the community has any other questions regarding
>> the
>>>> above design. I am planning to present this operator along with the Kudu
>>>> output operator in the Data works summit next month and any feedback
>> would
>>>> be useful.
>>>> 
>>>> 
>>>> Regards,
>>>> Ananth


Re: [Design discussion] - Kudu Input operator

Posted by Thomas Weise <th...@apache.org>.
-->

Thanks,
Thomas


On Sat, Aug 19, 2017 at 2:07 PM, Ananth G <an...@gmail.com> wrote:

> Hello Thomas,
>
> Replies in line marked [Ananth]>>
>
> Apologies for a little bit more longer description as I think the
> description needs more clarity.
>
> Regards,
> Ananth
>
> > On 19 Aug 2017, at 11:10 am, Thomas Weise <th...@apache.org> wrote:
> >
> > Hi Ananth,
> >
> > Nice writeup, couple questions/comments inline ->
> >
> > On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.apex@gmail.com
> <ma...@gmail.com>> wrote:
> >
> >> Hello All,
> >>
> >> The implementation for Apex Kudu Input Operator is ready for a pull
> >> request. Before raising the pull request, I would like to get any inputs
> >> regarding the design and incorporate any feedback before raising the
> pull
> >> request in the next couple of days for the following JIRA.
> >>
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472> <
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
> >>
> >> The following are the main features that would be supported by the Input
> >> operator:
> >>
> >> - The input operator would be used to scan all or some rows of a single
> >> kudu table.
> >> - Each Kudu row is translated to a POJO for downstream operators.
> >> - The Input operator would accept an SQL expression ( described in
> detail
> >> below) that would be parsed to generate the equivalent scanner code for
> the
> >> Kudu Table. This is because Kudu Table API does not support an SQL
> >> expressions
> >> - The SQL expression would have additional options that would help in
> >> Apache Apex design patterns ( Ex: Sending a control tuple message after
> a
> >> query is successfully processed )
> >> - The Input operator works on a continuous basis i.e. it would accept
> the
> >> next query once the current query is complete)
> >>
> >
> > This means the operator will repeat the query to fetch newly added rows,
> > similar to what the JDBC poll operator does, correct?
> [Ananth]>> Yes.  All of this design is covered by the Abstract
> implementation. In fact there is a default implementation of the abstract
> operator that does exactly this.This default implementation operator is
> called IncrementalStepScanInputOperator. This operator based on a
> properties file can be used to implement the JDBC Poll operator
> functionality using a timestamp column as the incremental step value.
>
> The design however does not limit us to only this pattern but can
> accomodate other patterns as well. Here is what I want to add in this
> context:
>     - Additional pattern can include a “time travel pattern”. Since Kudu
> is an MVCC engine ( and if appropriately configured ) , we can use this
> operator to answer question like “ Can I stream the entire or subset of the
> kudu table at times 1 AM , 2 AM , 3 AM ..“ Of today even though the current
> time could be 6 P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME
> which is a supported option of the SQL grammar we are enabling for this
> operator )
>

So this could be used to do event time based processing based on the
snapshot time (without a timestamp column)?


>     - Another interesting pattern is when the next query has got no
> correlation with a previous query . Example use cases can be say using
> Apex-cli equivalent or more possible future use case like Apache Zeppelin
> integration. A query comes in ad-hoc and the values can be streamed from
> the current incoming expression i.e. when we want to enable interactive
> query based streaming.
>
> >
> > - The operator will work in a distributed fashion for the input query.
> This
> >> essentially means for a single input query, the scan work is distributed
> >> among all of the physical instances of the input operator.
> >> - Kudu splits a table into chunks of data regions called Tablets. The
> >> tablets are replicated and partitioned  (range and hash partitions are
> >> supported ) in Kudu according to the Kudu Table definition. The operator
> >> allows partitioning of the Input Operator to be done in 2 ways.
> >>        - Map many Kudu Tablets to one partition of the Apex Kudu Input
> >> operator
> >>        - One Kudu Tablet maps to one partition of the Apex Kudu Input
> >> operator
> >> - The partitioning does not change on a per query basis. This is because
> >> of the complex use cases that would arise. For example, if the query is
> >> touching only a few rows before the next query is accepted, it would
> result
> >> in a lot of churn in terms of operator serialize/deserialze, YARN
> >> allocation requests etc. Also supporting per query partition planning
> leads
> >> to possibly very complex implementation and poor resource usage as all
> >> physical instances of the operator have to wait for its peers to
> complete
> >> its scan and wait for next checkpoint to get repartitioned.
> >>
> >
> > Agreed, what would be a reason to change partitioning between queries
> > though?
> >
> [Ananth]>> Was making that note more in the context of Dynamic
> partitioning. My current understanding is that dynamic partitioning is
> entirely based on the performance stats and I was thinking that did not
> make exact sense when the stats need to be entirely dependent on the
> business logic ( query in this case ) and not the operational aspects that
> is being supported by the StatsListener interface. So the Abstract operator
> suppresses any dynamic partitioning as the stats would severely change
> basing on the query and data distribution of the underlying kudu cluster
> and kudu table definition.
>

Dynamic partitioning can be based on any trigger, including external
information such as queue size or number of files in a directory. Some
implementations that you see in the code base trigger partition change
based on internal metrics, you can implement your custom logic in the
StatsListener.


>
> >
> >> - The partitioner splits the work load of a single query in a round
> robin
> >> fashion. After a query plan is generated , each scan token range is
> >> distributed equally among the physical operator instances.
> >> - The operator allows for two modes of scanning for an application (
> >> Cannot be changed on a per query basis )
> >>        - Consistent Order scanner - only one tablet scan thread is
> active
> >> at any given instance of time for a given query
> >>        - Random Order scanner - Many threads are active to scan Kudu
> >> tablets in parallel
> >> - As can be seen, Consistent order scanner would be slower but would
> help
> >> in better “exactly once” implementations if the correct method is
> >> overridden in the operator.
> >>
> >
> > Can you elaborate on this a bit more? Ordering within a streaming window
> > generally isn't deterministic when you have a shuffle or stream merge.
> And
> > the association between input records and streaming windows can be made
> > deterministic by using the window data manager?
> >
> [Ananth]>>  The consistent order scanner essentially does two things :
> ensures that there is only single thread per apex operator for scanning a
> kudu tablet and also marking it fault tolerant from Kudu Tablet failures. I
> was more referring to the issue that this deterministic behaviour is not
> guaranteed at the Kudu tablets level itself when multiple tablets are
> mapped to a single apex partition. In this case, when there are multiple
> threads scanning multiple kudu tablets and contributing to the buffer of a
> single partition of the Apex operator. If however the user configures Apex
> operator as one to one mapping or uses the consistent order scanner, we can
> at least guarantee that the same ordering can happen just for the input
> tuples provided underlying kudu table is not mutated for that query result.
>
> Yes you are right that we cannot claim deterministic ordering in case
> there is a shuffle or merge in the downstream operators. Also there is no
> shuffle or merge within the Kudu Input operator itself. The query planner
> ensures that the predicates are pushed down to the Kudu scan engine and the
> results are streamed to the individual partitions. If the user chooses to
> unify Input operator instances due to application design needs, then
> ordering is definitely lost.
>
> The window manager comment is entirely for the Input operator state
> management and I did not mean that we are providing for end to end
> functionality with this input operator. This is the reason why we have a
> method called is “isAllowedInReconcilingWindow” as  an overridable method
> as only a business logic can tell in case of resumption from a checkpoint
> how to decide if we need to reprocess a tuple. Comparing it to the Kafka
> implementation, Kafka commits the offsets at the committed call whereas I
> could not take that approach because of the following reasons:
>
> - All of the physical instances of the Kudu input operator might not be
> processing the same query at any given instance of time ! This is because
> as soon as the physical instance of the operator is done processing with
> the current query, it is asking for its next work order. Since the data
> distribution of the underlying kudu cluster decides the amount of time a
> physical instance of the Apex Kudu input operator spends on working on a
> work order, each operator is at varying processing query windows.
> - As long as the the concrete implementation of the abstract input
> operator provides for the same sequence of the input queries/work orders,
> all of the physical instances should eventually process all of the data in
> some future time.
> - The Kafka operator had the benefit of doing this at committed call
> because it is based on offset and the work order/kafka partition does not
> change
> - In the case of Kudu input operator, it is entirely possible that there
> are multiple different queries processed and completed between calls for
> checkpoint and hence I thought it was sensible to only guarantee a state at
> the checkpoint() call rather than committed call.
>

That's correct and when you look at state management in other places, you
see that committed merely picks up state that was previously guaranteed
through checkpointed() for things such as finalizing/materializing files,
because that state becomes immutable only once the committed notification
arrives.

There are however multiple streaming windows between checkpoints, and when
those streaming windows are not deterministic, then rewind/replay can
produce different results in the topology (for example for an ingress time
based aggregation).


> - The Abstract input operator automatically filters all the data till the
> last window but one before the shutdown crash. These windows are being
> termed as “safe mode” in the code.
> - In the window that was active when shutdown/crash happened that needs
> more  stronger check ( called the reconciling window), the operator chooses
> to give flexibility for the user to decide whether we need to “re-stream” a
> tuple by the method “isAllowedInReconcilingWindow”.
>
>
> >
> >> - The operator introduces the DisruptorBlockingQueue for a low latency
> >> buffer management. LMAX disruptor library was considered and based on
> some
> >> other discussion threads on other Apache projects, settled on the
> >> ConversantMedia implementation of the Disruptor Blocking queue. This
> >> blocking queue is used when the kudu scanner thread wants to send the
> >> scanned row into the input operators main thread emitTuples() call.
> >> - The operator allows for exactly once semantics if the user specifies
> the
> >> logic for reconciling a possible duplicate row in situations when the
> >> operator is resuming from a checkpoint. This is done by overriding a
> method
> >> that returns a boolean ( true to emit the tuple and false to suppress
> the
> >> tuple ) when the operating is working in the reconciling window phase.
> As
> >> can be seen, this reconciling phase is only active at the max for one
> >> window.
> >
> > - The operator uses the FSWindowManager to manage metadata at the end of
> >> every window. From resumption at a checkpoint, the operator will still
> scan
> >> the Kudu tablets but simply not emit all rows that were already streamed
> >> downstream. Subsequently when the operator is in the reconciling window,
> >> the method described above is invoked to allow for duplicates filter.
> After
> >> this reconciling window, the operator works in the normal mode of
> operation.
> >>
> >
> > In which situation would I use this? How do I know the record was
> actually
> > processed downstream?
>
> [Ananth]>> I guess this is covered in the response which I just added
> above. Since there is no “good way” to decide in a truely distributed
> application, the flexibility is given to the end user to decide. The input
> operator only provides a guarantee that we will not duplicate rows in a
> best possible effort. On the other hand , there is the other use case if
> committed window is called. We might end up sending in more duplicates
> downstream in case of a crash and possibly miss some queries as well as
> each operator is at different queries at the time of the commit call.
>
>
The JMS input operator is an example where replay from source is not
possible. In that case, the operator itself needs to remember the tuples
that were emitted in the streaming windows until they are committed. Can a
similar approach not be used here?


> >
> >
> >> - The following are the additional configurable aspects of the operator
> >>        - Max tuples per window
> >>        - Spin policy and the buffer size for the Disruptor Blocking
> Queue
> >>        - Mechanism to provide custom control tuples if required
> >>        - Setting the number of Physical operator instances via the API
> if
> >> required.
> >>        - Setting the fault Tolerance. If fault tolerant , an alternative
> >> replica of the Kudu tablet is picked up for scanning if the initial
> tablet
> >> fails for whatever reason. However this slows down the scan throughput.
> >> Hence it is configurable by the end user.
> >>
> >>
> >> Some notes regarding the SQL expression for the operator:
> >>
> >> - The operator uses ANTLR4 to parse the SQL expression.
> >> - The parser is based on a grammar file which is part of the source
> tree.
> >> The grammar is compiled on every build as part of the build process and
> >> code is generated for the parser automatically.
> >> - The reason we had to use a custom parser are (as opposed to something
> >> like calcite) :
> >>        - Kudu does not have all the features for a standard SQL
> >> expression. As an example != ( not equal to ) is not supported. Nor is
> >> there a concept of a Join etc.
> >>        - We are providing a lot more flexibility for the user to specify
> >> what the control tuple message should be should the end user choose to
> send
> >> a control tuple downstream after the given query is done processing
> >> - The SQL expression can specify a set of options for processing of the
> >> query:
> >>        - Control tuple message : A message/string that can be sent as
> the
> >> Control tuple field. There would be other parts for this control tuple
> like
> >> the query that was just completed and whether this is a begin or end of
> the
> >> scan.
> >>
> >
> > Will there be any support for watermarks that would work with downstream
> > window operator? Another future idea might be to support Kudu as a source
> > in the Apex SQL API.
> >
> [Ananth]>> Currently there is a control tuple that is emitted at the end
> of the query if enabled in the SQL expression. Apart from this, the
> operator also allows for sending a control tuple at the beginning and
> ending of the query for a given physical instance of the operator should
> the user choose to. The control tuple itself is entirely extensible and is
> hence given as a templated variable to the Abstract operator. This
> essentially means that watermarks are supported by default if the user
> wants to send a custom marker? However it may be noted that this control
> tuple is can only be emitted at the beginning and end of the query. If you
> are referring to watermark for the data like event time based watermarks, I
> will need to explore a bit more into this and will perhaps need some
> enhancement
>
> I mean the latter but that is more for my information or in case it
affects the design and not intended to become part of the current work.


> Support for APEX SQL API is a great idea. I will need to do some more
> homework here as I have to see how the custom parser and the Calcite
> integration needs to happen as I believe we are using the Calcite parsers
> to enable some of the Streaming API ? Kudu drivers do not yet mention of
> any JDBC based driver and need to see how well we can use Calcite in the
> right way.
>
>
Again, treat this as outside of the current scope. It is not directly
related to JDBC. Have a look at the existing endpoints for the SQL API:

https://github.com/apache/apex-malhar/tree/master/sql/src/main/java/org/apache/apex/malhar/sql/table

In the fusion style app you could use the Kudu input operator, but to write
a pipeline in pure SQL with Kudu source, there would need to be a Kudu
endpoint. An interesting aspect would be that with Kudu (same would apply
to JDBC source) there is the potential for predicate pushdown.



> >
> >>        - Read Snapshot time : Kudu supports specifying the read snapshot
> >> time for which the scan has to occur. This is because Kudu is
> essentially
> >> an MVCC engine and stores multiple versions of the same row. The Read
> >> snapshot time allows for the end user to specify the read snapshot time
> for
> >> the scan.
> >> - The parser supports for general syntax checking. If there is an error
> in
> >> the SQL expression , the string representing the SQL expression
> supplied is
> >> emitted onto an error port and the next query is taken for processing.
> >> - The data types supported are only those data types as supported by the
> >> Kudu Engine. The parser supports data type parsing support. For example
> >> String data types are double quoted etc.
> >> - The Parser allows for a SELECT AA as BB style of expressions wherein
> AA
> >> is the column name in Kudu and BB is the name of the java POJO field
> name.
> >>
> >> Please let me know if the community has any other questions regarding
> the
> >> above design. I am planning to present this operator along with the Kudu
> >> output operator in the Data works summit next month and any feedback
> would
> >> be useful.
> >>
> >>
> >> Regards,
> >> Ananth
>
>

Re: [Design discussion] - Kudu Input operator

Posted by Ananth G <an...@gmail.com>.
Hello Thomas,

Replies in line marked [Ananth]>> 

Apologies for a little bit more longer description as I think the description needs more clarity. 

Regards,
Ananth

> On 19 Aug 2017, at 11:10 am, Thomas Weise <th...@apache.org> wrote:
> 
> Hi Ananth,
> 
> Nice writeup, couple questions/comments inline ->
> 
> On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <ananthg.apex@gmail.com <ma...@gmail.com>> wrote:
> 
>> Hello All,
>> 
>> The implementation for Apex Kudu Input Operator is ready for a pull
>> request. Before raising the pull request, I would like to get any inputs
>> regarding the design and incorporate any feedback before raising the pull
>> request in the next couple of days for the following JIRA.
>> 
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472> <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
>> 
>> The following are the main features that would be supported by the Input
>> operator:
>> 
>> - The input operator would be used to scan all or some rows of a single
>> kudu table.
>> - Each Kudu row is translated to a POJO for downstream operators.
>> - The Input operator would accept an SQL expression ( described in detail
>> below) that would be parsed to generate the equivalent scanner code for the
>> Kudu Table. This is because Kudu Table API does not support an SQL
>> expressions
>> - The SQL expression would have additional options that would help in
>> Apache Apex design patterns ( Ex: Sending a control tuple message after a
>> query is successfully processed )
>> - The Input operator works on a continuous basis i.e. it would accept the
>> next query once the current query is complete)
>> 
> 
> This means the operator will repeat the query to fetch newly added rows,
> similar to what the JDBC poll operator does, correct?
[Ananth]>> Yes.  All of this design is covered by the Abstract implementation. In fact there is a default implementation of the abstract operator that does exactly this.This default implementation operator is called IncrementalStepScanInputOperator. This operator based on a properties file can be used to implement the JDBC Poll operator functionality using a timestamp column as the incremental step value. 

The design however does not limit us to only this pattern but can accomodate other patterns as well. Here is what I want to add in this context: 
    - Additional pattern can include a “time travel pattern”. Since Kudu is an MVCC engine ( and if appropriately configured ) , we can use this operator to answer question like “ Can I stream the entire or subset of the kudu table at times 1 AM , 2 AM , 3 AM ..“ Of today even though the current time could be 6 P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME which is a supported option of the SQL grammar we are enabling for this operator )
    - Another interesting pattern is when the next query has got no correlation with a previous query . Example use cases can be say using Apex-cli equivalent or more possible future use case like Apache Zeppelin integration. A query comes in ad-hoc and the values can be streamed from the current incoming expression i.e. when we want to enable interactive query based streaming.

> 
> - The operator will work in a distributed fashion for the input query. This
>> essentially means for a single input query, the scan work is distributed
>> among all of the physical instances of the input operator.
>> - Kudu splits a table into chunks of data regions called Tablets. The
>> tablets are replicated and partitioned  (range and hash partitions are
>> supported ) in Kudu according to the Kudu Table definition. The operator
>> allows partitioning of the Input Operator to be done in 2 ways.
>>        - Map many Kudu Tablets to one partition of the Apex Kudu Input
>> operator
>>        - One Kudu Tablet maps to one partition of the Apex Kudu Input
>> operator
>> - The partitioning does not change on a per query basis. This is because
>> of the complex use cases that would arise. For example, if the query is
>> touching only a few rows before the next query is accepted, it would result
>> in a lot of churn in terms of operator serialize/deserialze, YARN
>> allocation requests etc. Also supporting per query partition planning leads
>> to possibly very complex implementation and poor resource usage as all
>> physical instances of the operator have to wait for its peers to complete
>> its scan and wait for next checkpoint to get repartitioned.
>> 
> 
> Agreed, what would be a reason to change partitioning between queries
> though?
> 
[Ananth]>> Was making that note more in the context of Dynamic partitioning. My current understanding is that dynamic partitioning is entirely based on the performance stats and I was thinking that did not make exact sense when the stats need to be entirely dependent on the business logic ( query in this case ) and not the operational aspects that is being supported by the StatsListener interface. So the Abstract operator suppresses any dynamic partitioning as the stats would severely change basing on the query and data distribution of the underlying kudu cluster and kudu table definition. 

> 
>> - The partitioner splits the work load of a single query in a round robin
>> fashion. After a query plan is generated , each scan token range is
>> distributed equally among the physical operator instances.
>> - The operator allows for two modes of scanning for an application (
>> Cannot be changed on a per query basis )
>>        - Consistent Order scanner - only one tablet scan thread is active
>> at any given instance of time for a given query
>>        - Random Order scanner - Many threads are active to scan Kudu
>> tablets in parallel
>> - As can be seen, Consistent order scanner would be slower but would help
>> in better “exactly once” implementations if the correct method is
>> overridden in the operator.
>> 
> 
> Can you elaborate on this a bit more? Ordering within a streaming window
> generally isn't deterministic when you have a shuffle or stream merge. And
> the association between input records and streaming windows can be made
> deterministic by using the window data manager?
> 
[Ananth]>>  The consistent order scanner essentially does two things : ensures that there is only single thread per apex operator for scanning a kudu tablet and also marking it fault tolerant from Kudu Tablet failures. I was more referring to the issue that this deterministic behaviour is not guaranteed at the Kudu tablets level itself when multiple tablets are mapped to a single apex partition. In this case, when there are multiple threads scanning multiple kudu tablets and contributing to the buffer of a single partition of the Apex operator. If however the user configures Apex operator as one to one mapping or uses the consistent order scanner, we can at least guarantee that the same ordering can happen just for the input tuples provided underlying kudu table is not mutated for that query result.

Yes you are right that we cannot claim deterministic ordering in case there is a shuffle or merge in the downstream operators. Also there is no shuffle or merge within the Kudu Input operator itself. The query planner ensures that the predicates are pushed down to the Kudu scan engine and the results are streamed to the individual partitions. If the user chooses to unify Input operator instances due to application design needs, then ordering is definitely lost. 

The window manager comment is entirely for the Input operator state management and I did not mean that we are providing for end to end functionality with this input operator. This is the reason why we have a method called is “isAllowedInReconcilingWindow” as  an overridable method as only a business logic can tell in case of resumption from a checkpoint how to decide if we need to reprocess a tuple. Comparing it to the Kafka implementation, Kafka commits the offsets at the committed call whereas I could not take that approach because of the following reasons: 

- All of the physical instances of the Kudu input operator might not be processing the same query at any given instance of time ! This is because as soon as the physical instance of the operator is done processing with the current query, it is asking for its next work order. Since the data distribution of the underlying kudu cluster decides the amount of time a physical instance of the Apex Kudu input operator spends on working on a work order, each operator is at varying processing query windows.
- As long as the the concrete implementation of the abstract input operator provides for the same sequence of the input queries/work orders, all of the physical instances should eventually process all of the data in some future time.
- The Kafka operator had the benefit of doing this at committed call because it is based on offset and the work order/kafka partition does not change 
- In the case of Kudu input operator, it is entirely possible that there are multiple different queries processed and completed between calls for checkpoint and hence I thought it was sensible to only guarantee a state at the checkpoint() call rather than committed call. 
- The Abstract input operator automatically filters all the data till the last window but one before the shutdown crash. These windows are being termed as “safe mode” in the code.
- In the window that was active when shutdown/crash happened that needs more  stronger check ( called the reconciling window), the operator chooses to give flexibility for the user to decide whether we need to “re-stream” a tuple by the method “isAllowedInReconcilingWindow”. 


> 
>> - The operator introduces the DisruptorBlockingQueue for a low latency
>> buffer management. LMAX disruptor library was considered and based on some
>> other discussion threads on other Apache projects, settled on the
>> ConversantMedia implementation of the Disruptor Blocking queue. This
>> blocking queue is used when the kudu scanner thread wants to send the
>> scanned row into the input operators main thread emitTuples() call.
>> - The operator allows for exactly once semantics if the user specifies the
>> logic for reconciling a possible duplicate row in situations when the
>> operator is resuming from a checkpoint. This is done by overriding a method
>> that returns a boolean ( true to emit the tuple and false to suppress the
>> tuple ) when the operating is working in the reconciling window phase. As
>> can be seen, this reconciling phase is only active at the max for one
>> window.
> 
> - The operator uses the FSWindowManager to manage metadata at the end of
>> every window. From resumption at a checkpoint, the operator will still scan
>> the Kudu tablets but simply not emit all rows that were already streamed
>> downstream. Subsequently when the operator is in the reconciling window,
>> the method described above is invoked to allow for duplicates filter. After
>> this reconciling window, the operator works in the normal mode of operation.
>> 
> 
> In which situation would I use this? How do I know the record was actually
> processed downstream?

[Ananth]>> I guess this is covered in the response which I just added above. Since there is no “good way” to decide in a truely distributed application, the flexibility is given to the end user to decide. The input operator only provides a guarantee that we will not duplicate rows in a best possible effort. On the other hand , there is the other use case if committed window is called. We might end up sending in more duplicates downstream in case of a crash and possibly miss some queries as well as each operator is at different queries at the time of the commit call. 

> 
> 
>> - The following are the additional configurable aspects of the operator
>>        - Max tuples per window
>>        - Spin policy and the buffer size for the Disruptor Blocking Queue
>>        - Mechanism to provide custom control tuples if required
>>        - Setting the number of Physical operator instances via the API if
>> required.
>>        - Setting the fault Tolerance. If fault tolerant , an alternative
>> replica of the Kudu tablet is picked up for scanning if the initial tablet
>> fails for whatever reason. However this slows down the scan throughput.
>> Hence it is configurable by the end user.
>> 
>> 
>> Some notes regarding the SQL expression for the operator:
>> 
>> - The operator uses ANTLR4 to parse the SQL expression.
>> - The parser is based on a grammar file which is part of the source tree.
>> The grammar is compiled on every build as part of the build process and
>> code is generated for the parser automatically.
>> - The reason we had to use a custom parser are (as opposed to something
>> like calcite) :
>>        - Kudu does not have all the features for a standard SQL
>> expression. As an example != ( not equal to ) is not supported. Nor is
>> there a concept of a Join etc.
>>        - We are providing a lot more flexibility for the user to specify
>> what the control tuple message should be should the end user choose to send
>> a control tuple downstream after the given query is done processing
>> - The SQL expression can specify a set of options for processing of the
>> query:
>>        - Control tuple message : A message/string that can be sent as the
>> Control tuple field. There would be other parts for this control tuple like
>> the query that was just completed and whether this is a begin or end of the
>> scan.
>> 
> 
> Will there be any support for watermarks that would work with downstream
> window operator? Another future idea might be to support Kudu as a source
> in the Apex SQL API.
> 
[Ananth]>> Currently there is a control tuple that is emitted at the end of the query if enabled in the SQL expression. Apart from this, the operator also allows for sending a control tuple at the beginning and ending of the query for a given physical instance of the operator should the user choose to. The control tuple itself is entirely extensible and is hence given as a templated variable to the Abstract operator. This essentially means that watermarks are supported by default if the user wants to send a custom marker? However it may be noted that this control tuple is can only be emitted at the beginning and end of the query. If you are referring to watermark for the data like event time based watermarks, I will need to explore a bit more into this and will perhaps need some enhancement

Support for APEX SQL API is a great idea. I will need to do some more homework here as I have to see how the custom parser and the Calcite integration needs to happen as I believe we are using the Calcite parsers to enable some of the Streaming API ? Kudu drivers do not yet mention of any JDBC based driver and need to see how well we can use Calcite in the right way. 

> 
>>        - Read Snapshot time : Kudu supports specifying the read snapshot
>> time for which the scan has to occur. This is because Kudu is essentially
>> an MVCC engine and stores multiple versions of the same row. The Read
>> snapshot time allows for the end user to specify the read snapshot time for
>> the scan.
>> - The parser supports for general syntax checking. If there is an error in
>> the SQL expression , the string representing the SQL expression supplied is
>> emitted onto an error port and the next query is taken for processing.
>> - The data types supported are only those data types as supported by the
>> Kudu Engine. The parser supports data type parsing support. For example
>> String data types are double quoted etc.
>> - The Parser allows for a SELECT AA as BB style of expressions wherein AA
>> is the column name in Kudu and BB is the name of the java POJO field name.
>> 
>> Please let me know if the community has any other questions regarding the
>> above design. I am planning to present this operator along with the Kudu
>> output operator in the Data works summit next month and any feedback would
>> be useful.
>> 
>> 
>> Regards,
>> Ananth


Re: [Design discussion] - Kudu Input operator

Posted by Thomas Weise <th...@apache.org>.
Hi Ananth,

Nice writeup, couple questions/comments inline ->

On Tue, Aug 15, 2017 at 2:02 PM, Ananth G <an...@gmail.com> wrote:

> Hello All,
>
> The implementation for Apex Kudu Input Operator is ready for a pull
> request. Before raising the pull request, I would like to get any inputs
> regarding the design and incorporate any feedback before raising the pull
> request in the next couple of days for the following JIRA.
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472>
>
> The following are the main features that would be supported by the Input
> operator:
>
> - The input operator would be used to scan all or some rows of a single
> kudu table.
> - Each Kudu row is translated to a POJO for downstream operators.
> - The Input operator would accept an SQL expression ( described in detail
> below) that would be parsed to generate the equivalent scanner code for the
> Kudu Table. This is because Kudu Table API does not support an SQL
> expressions
> - The SQL expression would have additional options that would help in
> Apache Apex design patterns ( Ex: Sending a control tuple message after a
> query is successfully processed )
> - The Input operator works on a continuous basis i.e. it would accept the
> next query once the current query is complete)
>

This means the operator will repeat the query to fetch newly added rows,
similar to what the JDBC poll operator does, correct?

- The operator will work in a distributed fashion for the input query. This
> essentially means for a single input query, the scan work is distributed
> among all of the physical instances of the input operator.
> - Kudu splits a table into chunks of data regions called Tablets. The
> tablets are replicated and partitioned  (range and hash partitions are
> supported ) in Kudu according to the Kudu Table definition. The operator
> allows partitioning of the Input Operator to be done in 2 ways.
>         - Map many Kudu Tablets to one partition of the Apex Kudu Input
> operator
>         - One Kudu Tablet maps to one partition of the Apex Kudu Input
> operator
> - The partitioning does not change on a per query basis. This is because
> of the complex use cases that would arise. For example, if the query is
> touching only a few rows before the next query is accepted, it would result
> in a lot of churn in terms of operator serialize/deserialze, YARN
> allocation requests etc. Also supporting per query partition planning leads
> to possibly very complex implementation and poor resource usage as all
> physical instances of the operator have to wait for its peers to complete
> its scan and wait for next checkpoint to get repartitioned.
>

Agreed, what would be a reason to change partitioning between queries
though?


> - The partitioner splits the work load of a single query in a round robin
> fashion. After a query plan is generated , each scan token range is
> distributed equally among the physical operator instances.
> - The operator allows for two modes of scanning for an application (
> Cannot be changed on a per query basis )
>         - Consistent Order scanner - only one tablet scan thread is active
> at any given instance of time for a given query
>         - Random Order scanner - Many threads are active to scan Kudu
> tablets in parallel
> - As can be seen, Consistent order scanner would be slower but would help
> in better “exactly once” implementations if the correct method is
> overridden in the operator.
>

Can you elaborate on this a bit more? Ordering within a streaming window
generally isn't deterministic when you have a shuffle or stream merge. And
the association between input records and streaming windows can be made
deterministic by using the window data manager?


> - The operator introduces the DisruptorBlockingQueue for a low latency
> buffer management. LMAX disruptor library was considered and based on some
> other discussion threads on other Apache projects, settled on the
> ConversantMedia implementation of the Disruptor Blocking queue. This
> blocking queue is used when the kudu scanner thread wants to send the
> scanned row into the input operators main thread emitTuples() call.
> - The operator allows for exactly once semantics if the user specifies the
> logic for reconciling a possible duplicate row in situations when the
> operator is resuming from a checkpoint. This is done by overriding a method
> that returns a boolean ( true to emit the tuple and false to suppress the
> tuple ) when the operating is working in the reconciling window phase. As
> can be seen, this reconciling phase is only active at the max for one
> window.

- The operator uses the FSWindowManager to manage metadata at the end of
> every window. From resumption at a checkpoint, the operator will still scan
> the Kudu tablets but simply not emit all rows that were already streamed
> downstream. Subsequently when the operator is in the reconciling window,
> the method described above is invoked to allow for duplicates filter. After
> this reconciling window, the operator works in the normal mode of operation.
>

In which situation would I use this? How do I know the record was actually
processed downstream?


> - The following are the additional configurable aspects of the operator
>         - Max tuples per window
>         - Spin policy and the buffer size for the Disruptor Blocking Queue
>         - Mechanism to provide custom control tuples if required
>         - Setting the number of Physical operator instances via the API if
> required.
>         - Setting the fault Tolerance. If fault tolerant , an alternative
> replica of the Kudu tablet is picked up for scanning if the initial tablet
> fails for whatever reason. However this slows down the scan throughput.
> Hence it is configurable by the end user.
>
>
> Some notes regarding the SQL expression for the operator:
>
> - The operator uses ANTLR4 to parse the SQL expression.
> - The parser is based on a grammar file which is part of the source tree.
> The grammar is compiled on every build as part of the build process and
> code is generated for the parser automatically.
> - The reason we had to use a custom parser are (as opposed to something
> like calcite) :
>         - Kudu does not have all the features for a standard SQL
> expression. As an example != ( not equal to ) is not supported. Nor is
> there a concept of a Join etc.
>         - We are providing a lot more flexibility for the user to specify
> what the control tuple message should be should the end user choose to send
> a control tuple downstream after the given query is done processing
> - The SQL expression can specify a set of options for processing of the
> query:
>         - Control tuple message : A message/string that can be sent as the
> Control tuple field. There would be other parts for this control tuple like
> the query that was just completed and whether this is a begin or end of the
> scan.
>

Will there be any support for watermarks that would work with downstream
window operator? Another future idea might be to support Kudu as a source
in the Apex SQL API.


>         - Read Snapshot time : Kudu supports specifying the read snapshot
> time for which the scan has to occur. This is because Kudu is essentially
> an MVCC engine and stores multiple versions of the same row. The Read
> snapshot time allows for the end user to specify the read snapshot time for
> the scan.
> - The parser supports for general syntax checking. If there is an error in
> the SQL expression , the string representing the SQL expression supplied is
> emitted onto an error port and the next query is taken for processing.
> - The data types supported are only those data types as supported by the
> Kudu Engine. The parser supports data type parsing support. For example
> String data types are double quoted etc.
> - The Parser allows for a SELECT AA as BB style of expressions wherein AA
> is the column name in Kudu and BB is the name of the java POJO field name.
>
> Please let me know if the community has any other questions regarding the
> above design. I am planning to present this operator along with the Kudu
> output operator in the Data works summit next month and any feedback would
> be useful.
>
>
> Regards,
> Ananth