You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "Chris Lambertus (JIRA)" <ji...@apache.org> on 2016/10/08 04:34:21 UTC

[jira] [Comment Edited] (APEXMALHAR-2086) Kafka Output Operator with Kafka 0.9 API

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359194#comment-15359194 ] 

Chris Lambertus edited comment on APEXMALHAR-2086 at 10/8/16 4:33 AM:
----------------------------------------------------------------------

[~thw]
API difference
There aren't many APIs on 0.9 but still there are difference between 0.9 and 0.8 Producer. 
In 0.9 Producer it gives you a callback saying there is some error on the server-side which 0.8 doesn't have. 

Reason for exactly-once operator

First of all, there is no existing exactly-once output operator in 0.8 operators. 
Secondly, I think it is asked from some malhar users

This is actually the first exactly-once attempt for output operator with some assumptions (we use the keys in kafka message to skip those messages that has been saved but yet not recognized by WindowDataManager, this can be discussed, but extra space for those information is needed)

The way how it works is
When a output operator comes back from a failure, the steps are
1 It loads messages from the maximum offsets that are saved by WindowDataManager.
2 It starts replay messages. 
3 For messages that are within the windows saved by WindowDataManager, it simply skip them.
4 If the message is in last failed partial window. It use the loaded messages in step 1 to avoid duplication.

Because different operator partitions could write data to same kafka partition, we need to keep the message along with the operator partition id to recognize whether the messages are from the failed operator or not. We stored that information in the key which is a compromise we might need to think about other solution.
The other assumption is for step 1, we have a maximum number of messages that can be loaded. If the operator is not recovered very quickly and meanwhile other operators produce lots of data to partitions. It basically is impossible to dedupe the partial window mixed with data from other operators, which would be at least once in this case.






was (Author: hsy541):
[~thomas.weise@googlemail.com]
API difference
There aren't many APIs on 0.9 but still there are difference between 0.9 and 0.8 Producer. 
In 0.9 Producer it gives you a callback saying there is some error on the server-side which 0.8 doesn't have. 

Reason for exactly-once operator

First of all, there is no existing exactly-once output operator in 0.8 operators. 
Secondly, I think it is asked from some malhar users

This is actually the first exactly-once attempt for output operator with some assumptions (we use the keys in kafka message to skip those messages that has been saved but yet not recognized by WindowDataManager, this can be discussed, but extra space for those information is needed)

The way how it works is
When a output operator comes back from a failure, the steps are
1 It loads messages from the maximum offsets that are saved by WindowDataManager.
2 It starts replay messages. 
3 For messages that are within the windows saved by WindowDataManager, it simply skip them.
4 If the message is in last failed partial window. It use the loaded messages in step 1 to avoid duplication.

Because different operator partitions could write data to same kafka partition, we need to keep the message along with the operator partition id to recognize whether the messages are from the failed operator or not. We stored that information in the key which is a compromise we might need to think about other solution.
The other assumption is for step 1, we have a maximum number of messages that can be loaded. If the operator is not recovered very quickly and meanwhile other operators produce lots of data to partitions. It basically is impossible to dedupe the partial window mixed with data from other operators, which would be at least once in this case.





> Kafka Output Operator with Kafka 0.9 API
> ----------------------------------------
>
>                 Key: APEXMALHAR-2086
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2086
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Sandesh
>            Assignee: Sandesh
>             Fix For: 3.5.0
>
>
> Goal : 2 Operartors for Kafka Output
>       1. Simple Kafka Output Operator 
>             - Supports Atleast Once 
>             - Expose most used producer properties as class properties
>       2. Exactly Once Kafka Output ( Not possible in all the cases, will be documented later )
>             
> Design for Exactly Once
> Window Data Manager - Stores the Kafka partitions offsets.
> Kafka Key - Used by the operator = AppID#OperatorId
> During recovery. Partially written window is re-created using the following  approach:
> Tuples between the largest recovery offsets and the current offset are checked. Based on the key, tuples written by the other entities are discarded. 
> Only tuples which are not in the recovered set are emitted.
> Tuples needs to be unique within the window.
>       



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)