You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "Ananth (JIRA)" <ji...@apache.org> on 2017/11/11 11:26:09 UTC

[jira] [Closed] (APEXMALHAR-2278) Implement Kudu Output Operator for non-transactional streams

     [ https://issues.apache.org/jira/browse/APEXMALHAR-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ananth closed APEXMALHAR-2278.
------------------------------

> Implement Kudu Output Operator for non-transactional streams
> ------------------------------------------------------------
>
>                 Key: APEXMALHAR-2278
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2278
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>            Reporter: Ananth
>            Assignee: Ananth
>             Fix For: 3.8.0
>
>
> Here are some benefits of integrating Kudu and Apex:
>     Kudu is just declared 1.0 and has just been declared production ready.
>     Kudu as a store might a good a fit for many architectures in the years to come because of its capabilities to provide mutability of data ( unlike HDFS ) and optimized storage formats for low latency scans.
>     It seems to also withstand high-throughput write patterns which makes it a stable sink for Apex workflows which operate at very high volumes. 
> [Design] 
> 1. The operator would be an AbstractOperator and would allow the concrete implementations to set a few behavioral aspects of the operator. 
> 2. The following are the major phases of the operator:
>     During activate() phase of the operator : Establish a connection to the cluster and get the metadata about the table that is being used as the sink.
>     During setup() phase of the operator: Fetch the current window information and use it decide if we are recovering from a failure mode. (See point 8 below )
>     During process() of Input port : Inspect the incoming ExecutionContext ( see below ) tuple and perform one of the  operations ( Insert/Update/Delete/Upsert) 
> 3. The following parameters are tunable while establishing a Kudu connection:
>     Table name, Boss worker threads, Worker threads, Socket read time outs and External Consistency mode.
> 4. The user need not specify any schema outright. The pojo fields are automatically mapped to the table column names as identified in the schema parse in the activate phase. 
> 5. Allow the concrete implementation of the operator to override the Pojo field name to the table schema column name. This would allow flexibility in use cases like table schema column names are not compatible with java bean frameworks or in situations when column names cant be controlled as POJO is coming from an upstream operator.
> 6. The input tuple that is to be supplied to this operator is of type "Kudu Execution Context". This tuple encompasses the actual Pojo that is going to be persisted to the Kudu store. Additionally it allows the upstream operator to specify the operation that needs to be performed. One of the following operations is permitted as part of the context : Insert, Upsert, Update and delete on the Pojo that is acting as the payload in the Execution Context.
> 7. The concrete implementation of the operator would allow the user to specify the actual POJO class definition that would be used to the write to the table. The execution context would contain this POJO as well as the metadata that defines the behavior of the processing that needs to be done on that tuple.
> 8. The operator would allow for a special case of execution mode for the first window that is being processed as the operator gets activated. There are two modes for the first window of processing of the operator : 
>       a. Safe Mode :  Safe mode is the "happy path execution" as in no extra processing is required to perform the Kudu mutation.
>       b. Reconciling Mode:  There is an additional function that would be called to see if the user would like the tuple to be used for mutation. This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID   during the first window of processing by the operator. 
> This feature is deemed to be useful when an operator is recovering from a crash instance of the application and we do not want to perform multiple mutations of the same tuple given ATLEAST_ONCE is the default semantics.  
> 9. The operator is a stateless operator. 
> 10. The operator would generate the following autometrics :
>       a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters for each mutation) for a given window
>       b. Bytes written in a given window
>       c. Write RPCs in the given window
>       d. Total RPC errors in this window
>        e. All of the above metrics for the operator for its entire lifetime of the operator. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)