You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Joel Lundell (JIRA)" <ji...@apache.org> on 2016/12/05 16:49:59 UTC

[jira] [Comment Edited] (KAFKA-4486) Kafka Streams - exception in process still commits offsets

    [ https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722715#comment-15722715 ] 

Joel Lundell edited comment on KAFKA-4486 at 12/5/16 4:48 PM:
--------------------------------------------------------------

I debugged a little more and the exception escapes the run loop in StreamThread but independent on the type of exception they end up in the same finally clause and the thread shuts down. Can you point to where it tries to distinguish retriable from fatal? 

{code:java}
    /**
     * Execute the stream processors
     * @throws KafkaException for any Kafka-related exceptions
     * @throws Exception for any other non-Kafka exceptions
     */
    @Override
    public void run() {
        log.info("{} Starting", logPrefix);

        try {
            runLoop();
        } catch (KafkaException e) {
            // just re-throw the exception as it should be logged already
            throw e;
        } catch (Exception e) {
            // we have caught all Kafka related exceptions, and other runtime exceptions
            // should be due to user application errors
            log.error("{} Streams application error during processing: ", logPrefix, e);
            throw e;
        } finally {
            shutdown();
        }
    }
{code}


was (Author: joel.lundell):
I debugged a little more and the exception escapes the run loop in StreamThread but independent on the type of exception they end up in the same finally clause and the thread shuts down. Can you point to where it tries to distinguish retriable from fatal? 

    /**
     * Execute the stream processors
     * @throws KafkaException for any Kafka-related exceptions
     * @throws Exception for any other non-Kafka exceptions
     */
    @Override
    public void run() {
        log.info("{} Starting", logPrefix);

        try {
            runLoop();
        } catch (KafkaException e) {
            // just re-throw the exception as it should be logged already
            throw e;
        } catch (Exception e) {
            // we have caught all Kafka related exceptions, and other runtime exceptions
            // should be due to user application errors
            log.error("{} Streams application error during processing: ", logPrefix, e);
            throw e;
        } finally {
            shutdown();
        }
    }

> Kafka Streams - exception in process still commits offsets
> ----------------------------------------------------------
>
>                 Key: KAFKA-4486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4486
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>         Environment: Java 8
>            Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the commits manually using ProcessorContext#commit() from an instance of org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS SQS and I need to be able to guarantee that all messages reach the queue at least once. I also want to use SQS batching support so my approach at the moment is that in Processor#process i'm saving X records in a data structure and when I have a full batch I send it off and if successful i commit. If I for any reason can't deliver the records I don't want the offsets being committed so that when processing works again I can start processing from the last successful record.
> When I was trying out the error handling I noticed that if I create a Processor and in the process method always throw an exception that will trigger StreamThread#shutdownTaskAndState which calls StreamThread#commitOffsets and next time I run the application it starts as if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)