You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/12/09 01:38:00 UTC

[jira] [Created] (KAFKA-10829) Kafka Streams handle produce exception improvement

Guozhang Wang created KAFKA-10829:
-------------------------------------

             Summary: Kafka Streams handle produce exception improvement
                 Key: KAFKA-10829
                 URL: https://issues.apache.org/jira/browse/KAFKA-10829
             Project: Kafka
          Issue Type: Improvement
          Components: producer , streams
            Reporter: Guozhang Wang


A summary of some recent discussions on how we should improve on embedded producer exception handling.

Note that below the basline logic would guarantee that our correctness semantics is not violated; and optimization are on top of the baseline to reduce the user's burden by letting the library auto-handle certain types of exception.

1) ``Producer.send()`` throw exception directly: 

1.a) baseline (to make sure correctness) logic is to always wrap them as StreamsException, it would cause the thread to shutdown and exception handler triggered. The handler could look into the wrapped exception and decide whether the shutdown thread can be restarted.

1.b) optimization is to look at the exception, and decide if they can be wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then be auto-handled by lost-all-tasks and re-join.

2) ``Producer.send()`` Callback has an exception:

2.a) baseline is first to check if the exception is instanceof RetriableException.

If not retriable, pass it to the producer exception handler to decide whether to throw or to continue with record dropped. If decide to throw, always warp it as StreamsException and keep it locally; at the same time do not send more records from the caller. In the next send call, check the remembered exception and throw. It would cause the thread to shutdown and exception handler triggered.

If the exception is not Retriable, always throw it as a fatal StreamsException.

2.b) optimization one: if the non-retriable exception can be translated as a TaskMigratedException, then do not wrap it as StreamsException to let the library handle internally.

2.c) optimization two: if the retriable exception is a timeout exception, then do not pass to the produce exception handler and treat it as TaskMigrated.

3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:

3.a) baseline logic is to capture all KafkaException except TimeoutException, and handle them as *TaskCorrupted* (which include abort the transaction, reset the state, and re-join the group). TimeoutException would be rethrown.

3.b) optimization: some exceptions can be handled as TaskMigrated, which would be handled in a lighter way.

4) ``Producer.abortTxn`` throw exception:

3.a) baseline logic is to capture all KafkaException  except TimeoutException as fatal StreamsException. TimeoutException would be rethrown.

3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition means the abort did not succeeded).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)