You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Bruno Cadonna <br...@confluent.io> on 2020/08/11 07:25:28 UTC

Re: [DISCUSS] KIP-647: Add ability to handle late messages in streams-aggregation

Hi Igor,

Thanks for the KIP!

Similar to Matthias, I am also wondering why you rejected the more 
general solution involving a callback. I also think that writing to a 
topic is just one of multiple ways to handle late records. For example, 
one could compute statistics over the late records before or instead 
writing the records to a topic. Or it could write the records to a 
database to analyse.

Best,
Bruno

On 28.07.20 05:14, Matthias J. Sax wrote:
> Thanks for the KIP Igor.
> 
> What you propose sounds a little bit like a "dead-letter-queue" pattern.
> Thus, I am wondering if we should try to do a built-in
> "dead-letter-queue" feature that would be general purpose? For example,
> uses can drop message in the source node if they don't have a valid
> timestamp or if a deserialization error occurs and face a similar issue
> for those cases (even if it might be a little simpler to handle those
> cases, as custom user code is executed).
> 
> For a general purpose DLQ, the feature should be expose at the Processor
> API level though, and the DSL would just use this feature (instead of
> introducing it as a DSL feature).
> 
> Late records are of course only defined at the DSL level as for the PAPI
> users need to define custom semantics. Also, late records are not really
> corrupted. However, the pattern seems similar enough, ie, piping late
> data into a topic is just a special case for a DLQ?
> 
> I am also wondering, if piping late records into a DLQ is the only way
> to handle them? For example, I could imagine that a user just wants to
> trigger a side-effect (similar to what you mention in rejected
> alternatives)? Or maybe a user might even want to somehow process those
> record and feed them back into the actually processing pipeline.
> 
> Last, a DLQ is only useful if somebody consumes from the topic and does
> something with the data. Can you elaborate on the use-case how a user
> would use the preserved late records?
> 
> 
> 
> -Matthias
> 
> On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
>> Hi everybody,
>> I would like to start off the discussion for KIP-647:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver>
>>
>>
>> This KIP proposes a minor adjustment in the kafka-streams
>> aggregation-api, adding an ability for processing late messages.
>> [WIP] PR here:https://github.com/apache/kafka/pull/9017
>>
>> Please check.
>> Regards, Igor.
>>
>>
>>
>>
> 

Re: [DISCUSS] KIP-647: Add ability to handle late messages in streams-aggregation

Posted by "Matthias J. Sax" <mj...@apache.org>.
In general, I like Bruno's proposal. Was also checking how Flink is
handling this case, and similar to Bruno's proposal, they have a
so-called "side output":
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

The only "concern" I have with Bruno's proposal is, that we would add
`KTable.lateRecords()` and currently not all KTables have late-records.
For example, a non-windowed aggregation or a `builder.table()` or a
table-table-join result. -- We could argue that this is not really an
issue as the "late record stream" would just be empty though.

(Side note: this might change in the future... I am just brainstorming
about a new KIP that proposes to add a "grace period" to all KTables...)

However, we would also need a similar "side output" for stream-stream
joins, and thus would need to add `KStream.lateRecords()` that again
would not produce any output for all existing stateless operations.

Maybe it would be still ok, but we should at least discuss/consider the
impact. It might be confusing for users. Not sure "how bad" it would be.


I agree that letting users use the Producer API to write late records
into a DLQ topic seems not desirable and that KafkaStream should handle
it automatically. -- In general, I also see you point about having
different DLQ topics instead of a centralized one. However, my train of
though was about other "corrupted" records that users might want to
"side output": for example, record that are dropped by the
`DeserializationExceptionnHandler` or record with null-key that are
dropped by join/aggregation operator (and a few others). It might be
good to have a unified solution for all those cases?


-Matthias


On 8/26/20 1:44 AM, Bruno Cadonna wrote:
> Hi Igor,
> 
> Thank you for your answers!
> 
> I think I understand your reasoning.
> 
> Looking at your proposal, some questions/comments arose.
> 
> 1. Who would be responsible for the topic to which the late records
> should be sent? Is it a topic users create and manage or is it an
> internal topic that is managed by Streams and that is deleted when the
> application is reset? This is not clear from your KIP.
> 
> 2. I guess you also need an overload for session windows and not only
> for time windows. Once KIP-450 is implemented, you would also need an
> overload for sliding windows.
> 
> 
> I also think that we should discuss some other approaches since this
> includes a change to Streams' public API and we should be judicious with
> such changes.
> 
> A. Adding an operator to the DSL that starts a stream for late records.
> Something like the following:
> 
> KTable aggregation =
>     builder.stream(...).groupByKey().windowedBy().count();
> 
> aggregation.toStream().to(...);
> aggregation.lateRecords().to(...);
> 
> That would conceptually branch an aggregation into a stream for the
> aggregation results and a stream for the late records. The advantage
> would be that you could use DSL operators on the late records stream.
> Additionally, it makes the code more self-described. I guess this
> approach goes also in the direction that Matthias mentioned regarding a
> built-in dead letter queue.
> 
> B. Adding a callback and providing the implementation that would write
> the late records to late records topic. That would address your concern
> about users being forced to provide the callback. That would solve your
> other concern about using the Producer API, but maybe we could also find
> a way to solve that.
> 
> Best,
> Bruno
> 
> 
> On 25.08.20 20:09, Igor Piddubnyi wrote:
>> Hi Matthias, Bruno,
>> Let me elaborate on my suggestions regarding late-tick handling.
>>
>>  > why you rejected the more general solution involving a callback
>> The main reasons why I tend to the topic approach is API-consistency
>> and cleanness of the user code.
>> The problem with a callback, in my opinion, is that users of the API
>> will be forced to define handling for each late-item using
>> procedural-code.
>> The same custom handling (statistics, db-insert, etc.) could be
>> achieved by consuming the topic.
>> I acknowledge that topic-approach introduces an overhead, compared to
>> the callback, however by paying this price users are getting all
>> stream-processing features.
>> Taking a look from the opposite side, assuming there is a callback,
>> and there is a need to persist data in the topic, one would have to
>> fall-back to producer-API to implement such handling. This would be
>> not so clean, from my point of view.
>>
>>  >I am wondering if we should try to do a built-in "dead-letter-queue"
>> feature that would be general purpose?
>> Generic DLQ might be not so bad idea, however there could be more than
>> one aggregation. Assuming DLQ is generic and contains messages with
>> other kinds of errors, API definitely needs an ability to distinguish
>> between messages with different types of errors.
>> This would be definitely a significant change. Taking into account
>> that this is my first experience with kafka-internals, I tried to keep
>> suggested change as small as possible.
>>
>>  > I am also wondering, if piping late records into a DLQ is the only
>> way to handle them
>> Definitely not, but in my opinion stream-api fits better for any
>> custom handling that user can define.
>> E.g. I don't see any problems defining another processing pipe, which
>> consumes from DLQ, does any necessary side effects, and then gets
>> merged anywhere.
>>
>>  >Can you elaborate on the use-case how a user would use the preserved
>> late records?
>> As just explained, I see this as another processing pipe, or just a
>> consumer, reading data from this topic and doing any necessary handling.
>> This might happen even in another service, if required by the logic.
>> Docs probably should be updated with respective examples.
>>
>> E.g. in the system I'm working on, such handling would involve complex
>> data-correction in the database and would be executed on a separate
>> instance.
>> My arguments and answers might be quite biased, because I mostly
>> consider this use-case for the application currently being developed.
>> Please share your opinion and feedback.
>>
>> Regards, Igor.
>>
>> On 11.08.20 09:25, Bruno Cadonna wrote:
>>> Hi Igor,
>>>
>>> Thanks for the KIP!
>>>
>>> Similar to Matthias, I am also wondering why you rejected the more
>>> general solution involving a callback. I also think that writing to a
>>> topic is just one of multiple ways to handle late records. For
>>> example, one could compute statistics over the late records before or
>>> instead writing the records to a topic. Or it could write the records
>>> to a database to analyse.
>>>
>>> Best,
>>> Bruno
>>>
>>> On 28.07.20 05:14, Matthias J. Sax wrote:
>>>> Thanks for the KIP Igor.
>>>>
>>>> What you propose sounds a little bit like a "dead-letter-queue"
>>>> pattern.
>>>> Thus, I am wondering if we should try to do a built-in
>>>> "dead-letter-queue" feature that would be general purpose? For example,
>>>> uses can drop message in the source node if they don't have a valid
>>>> timestamp or if a deserialization error occurs and face a similar issue
>>>> for those cases (even if it might be a little simpler to handle those
>>>> cases, as custom user code is executed).
>>>>
>>>> For a general purpose DLQ, the feature should be expose at the
>>>> Processor
>>>> API level though, and the DSL would just use this feature (instead of
>>>> introducing it as a DSL feature).
>>>>
>>>> Late records are of course only defined at the DSL level as for the
>>>> PAPI
>>>> users need to define custom semantics. Also, late records are not
>>>> really
>>>> corrupted. However, the pattern seems similar enough, ie, piping late
>>>> data into a topic is just a special case for a DLQ?
>>>>
>>>> I am also wondering, if piping late records into a DLQ is the only way
>>>> to handle them? For example, I could imagine that a user just wants to
>>>> trigger a side-effect (similar to what you mention in rejected
>>>> alternatives)? Or maybe a user might even want to somehow process those
>>>> record and feed them back into the actually processing pipeline.
>>>>
>>>> Last, a DLQ is only useful if somebody consumes from the topic and does
>>>> something with the data. Can you elaborate on the use-case how a user
>>>> would use the preserved late records?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
>>>>> Hi everybody,
>>>>> I would like to start off the discussion for KIP-647:
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation
>>>>>
>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver>
>>>>>
>>>>>
>>>>>
>>>>> This KIP proposes a minor adjustment in the kafka-streams
>>>>> aggregation-api, adding an ability for processing late messages.
>>>>> [WIP] PR here:https://github.com/apache/kafka/pull/9017
>>>>>
>>>>> Please check.
>>>>> Regards, Igor.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>


Re: [DISCUSS] KIP-647: Add ability to handle late messages in streams-aggregation

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Igor,

Thank you for your answers!

I think I understand your reasoning.

Looking at your proposal, some questions/comments arose.

1. Who would be responsible for the topic to which the late records 
should be sent? Is it a topic users create and manage or is it an 
internal topic that is managed by Streams and that is deleted when the 
application is reset? This is not clear from your KIP.

2. I guess you also need an overload for session windows and not only 
for time windows. Once KIP-450 is implemented, you would also need an 
overload for sliding windows.


I also think that we should discuss some other approaches since this 
includes a change to Streams' public API and we should be judicious with 
such changes.

A. Adding an operator to the DSL that starts a stream for late records. 
Something like the following:

KTable aggregation =
     builder.stream(...).groupByKey().windowedBy().count();

aggregation.toStream().to(...);
aggregation.lateRecords().to(...);

That would conceptually branch an aggregation into a stream for the 
aggregation results and a stream for the late records. The advantage 
would be that you could use DSL operators on the late records stream. 
Additionally, it makes the code more self-described. I guess this 
approach goes also in the direction that Matthias mentioned regarding a 
built-in dead letter queue.

B. Adding a callback and providing the implementation that would write 
the late records to late records topic. That would address your concern 
about users being forced to provide the callback. That would solve your 
other concern about using the Producer API, but maybe we could also find 
a way to solve that.

Best,
Bruno


On 25.08.20 20:09, Igor Piddubnyi wrote:
> Hi Matthias, Bruno,
> Let me elaborate on my suggestions regarding late-tick handling.
> 
>  > why you rejected the more general solution involving a callback
> The main reasons why I tend to the topic approach is API-consistency and 
> cleanness of the user code.
> The problem with a callback, in my opinion, is that users of the API 
> will be forced to define handling for each late-item using procedural-code.
> The same custom handling (statistics, db-insert, etc.) could be achieved 
> by consuming the topic.
> I acknowledge that topic-approach introduces an overhead, compared to 
> the callback, however by paying this price users are getting all 
> stream-processing features.
> Taking a look from the opposite side, assuming there is a callback, and 
> there is a need to persist data in the topic, one would have to 
> fall-back to producer-API to implement such handling. This would be not 
> so clean, from my point of view.
> 
>  >I am wondering if we should try to do a built-in "dead-letter-queue" 
> feature that would be general purpose?
> Generic DLQ might be not so bad idea, however there could be more than 
> one aggregation. Assuming DLQ is generic and contains messages with 
> other kinds of errors, API definitely needs an ability to distinguish 
> between messages with different types of errors.
> This would be definitely a significant change. Taking into account that 
> this is my first experience with kafka-internals, I tried to keep 
> suggested change as small as possible.
> 
>  > I am also wondering, if piping late records into a DLQ is the only 
> way to handle them
> Definitely not, but in my opinion stream-api fits better for any custom 
> handling that user can define.
> E.g. I don't see any problems defining another processing pipe, which 
> consumes from DLQ, does any necessary side effects, and then gets merged 
> anywhere.
> 
>  >Can you elaborate on the use-case how a user would use the preserved 
> late records?
> As just explained, I see this as another processing pipe, or just a 
> consumer, reading data from this topic and doing any necessary handling.
> This might happen even in another service, if required by the logic. 
> Docs probably should be updated with respective examples.
> 
> E.g. in the system I'm working on, such handling would involve complex 
> data-correction in the database and would be executed on a separate 
> instance.
> My arguments and answers might be quite biased, because I mostly 
> consider this use-case for the application currently being developed.
> Please share your opinion and feedback.
> 
> Regards, Igor.
> 
> On 11.08.20 09:25, Bruno Cadonna wrote:
>> Hi Igor,
>>
>> Thanks for the KIP!
>>
>> Similar to Matthias, I am also wondering why you rejected the more 
>> general solution involving a callback. I also think that writing to a 
>> topic is just one of multiple ways to handle late records. For 
>> example, one could compute statistics over the late records before or 
>> instead writing the records to a topic. Or it could write the records 
>> to a database to analyse.
>>
>> Best,
>> Bruno
>>
>> On 28.07.20 05:14, Matthias J. Sax wrote:
>>> Thanks for the KIP Igor.
>>>
>>> What you propose sounds a little bit like a "dead-letter-queue" pattern.
>>> Thus, I am wondering if we should try to do a built-in
>>> "dead-letter-queue" feature that would be general purpose? For example,
>>> uses can drop message in the source node if they don't have a valid
>>> timestamp or if a deserialization error occurs and face a similar issue
>>> for those cases (even if it might be a little simpler to handle those
>>> cases, as custom user code is executed).
>>>
>>> For a general purpose DLQ, the feature should be expose at the Processor
>>> API level though, and the DSL would just use this feature (instead of
>>> introducing it as a DSL feature).
>>>
>>> Late records are of course only defined at the DSL level as for the PAPI
>>> users need to define custom semantics. Also, late records are not really
>>> corrupted. However, the pattern seems similar enough, ie, piping late
>>> data into a topic is just a special case for a DLQ?
>>>
>>> I am also wondering, if piping late records into a DLQ is the only way
>>> to handle them? For example, I could imagine that a user just wants to
>>> trigger a side-effect (similar to what you mention in rejected
>>> alternatives)? Or maybe a user might even want to somehow process those
>>> record and feed them back into the actually processing pipeline.
>>>
>>> Last, a DLQ is only useful if somebody consumes from the topic and does
>>> something with the data. Can you elaborate on the use-case how a user
>>> would use the preserved late records?
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
>>>> Hi everybody,
>>>> I would like to start off the discussion for KIP-647:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation 
>>>>
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver> 
>>>>
>>>>
>>>>
>>>> This KIP proposes a minor adjustment in the kafka-streams
>>>> aggregation-api, adding an ability for processing late messages.
>>>> [WIP] PR here:https://github.com/apache/kafka/pull/9017
>>>>
>>>> Please check.
>>>> Regards, Igor.
>>>>
>>>>
>>>>
>>>>
>>>

Re: [DISCUSS] KIP-647: Add ability to handle late messages in streams-aggregation

Posted by Igor Piddubnyi <ig...@gmail.com>.
Hi Matthias, Bruno,
Let me elaborate on my suggestions regarding late-tick handling.

 > why you rejected the more general solution involving a callback
The main reasons why I tend to the topic approach is API-consistency and 
cleanness of the user code.
The problem with a callback, in my opinion, is that users of the API 
will be forced to define handling for each late-item using procedural-code.
The same custom handling (statistics, db-insert, etc.) could be achieved 
by consuming the topic.
I acknowledge that topic-approach introduces an overhead, compared to 
the callback, however by paying this price users are getting all 
stream-processing features.
Taking a look from the opposite side, assuming there is a callback, and 
there is a need to persist data in the topic, one would have to 
fall-back to producer-API to implement such handling. This would be not 
so clean, from my point of view.

 >I am wondering if we should try to do a built-in "dead-letter-queue" 
feature that would be general purpose?
Generic DLQ might be not so bad idea, however there could be more than 
one aggregation. Assuming DLQ is generic and contains messages with 
other kinds of errors, API definitely needs an ability to distinguish 
between messages with different types of errors.
This would be definitely a significant change. Taking into account that 
this is my first experience with kafka-internals, I tried to keep 
suggested change as small as possible.

 > I am also wondering, if piping late records into a DLQ is the only 
way to handle them
Definitely not, but in my opinion stream-api fits better for any custom 
handling that user can define.
E.g. I don't see any problems defining another processing pipe, which 
consumes from DLQ, does any necessary side effects, and then gets merged 
anywhere.

 >Can you elaborate on the use-case how a user would use the preserved 
late records?
As just explained, I see this as another processing pipe, or just a 
consumer, reading data from this topic and doing any necessary handling.
This might happen even in another service, if required by the logic. 
Docs probably should be updated with respective examples.

E.g. in the system I'm working on, such handling would involve complex 
data-correction in the database and would be executed on a separate 
instance.
My arguments and answers might be quite biased, because I mostly 
consider this use-case for the application currently being developed.
Please share your opinion and feedback.

Regards, Igor.

On 11.08.20 09:25, Bruno Cadonna wrote:
> Hi Igor,
>
> Thanks for the KIP!
>
> Similar to Matthias, I am also wondering why you rejected the more 
> general solution involving a callback. I also think that writing to a 
> topic is just one of multiple ways to handle late records. For 
> example, one could compute statistics over the late records before or 
> instead writing the records to a topic. Or it could write the records 
> to a database to analyse.
>
> Best,
> Bruno
>
> On 28.07.20 05:14, Matthias J. Sax wrote:
>> Thanks for the KIP Igor.
>>
>> What you propose sounds a little bit like a "dead-letter-queue" pattern.
>> Thus, I am wondering if we should try to do a built-in
>> "dead-letter-queue" feature that would be general purpose? For example,
>> uses can drop message in the source node if they don't have a valid
>> timestamp or if a deserialization error occurs and face a similar issue
>> for those cases (even if it might be a little simpler to handle those
>> cases, as custom user code is executed).
>>
>> For a general purpose DLQ, the feature should be expose at the Processor
>> API level though, and the DSL would just use this feature (instead of
>> introducing it as a DSL feature).
>>
>> Late records are of course only defined at the DSL level as for the PAPI
>> users need to define custom semantics. Also, late records are not really
>> corrupted. However, the pattern seems similar enough, ie, piping late
>> data into a topic is just a special case for a DLQ?
>>
>> I am also wondering, if piping late records into a DLQ is the only way
>> to handle them? For example, I could imagine that a user just wants to
>> trigger a side-effect (similar to what you mention in rejected
>> alternatives)? Or maybe a user might even want to somehow process those
>> record and feed them back into the actually processing pipeline.
>>
>> Last, a DLQ is only useful if somebody consumes from the topic and does
>> something with the data. Can you elaborate on the use-case how a user
>> would use the preserved late records?
>>
>>
>>
>> -Matthias
>>
>> On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
>>> Hi everybody,
>>> I would like to start off the discussion for KIP-647:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation 
>>>
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver> 
>>>
>>>
>>>
>>> This KIP proposes a minor adjustment in the kafka-streams
>>> aggregation-api, adding an ability for processing late messages.
>>> [WIP] PR here:https://github.com/apache/kafka/pull/9017
>>>
>>> Please check.
>>> Regards, Igor.
>>>
>>>
>>>
>>>
>>