You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chris Curtin <cu...@gmail.com> on 2013/07/09 17:16:28 UTC

High Level Consumer error handling and clean exit

Hi,

I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is
handled.

Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'

Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
provided
- startup again and look to see what offset was first emitted for a
partition

Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?

Here is the example code in the 'business logic':

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        int counter = 0;
        while (it.hasNext())   {
            MessageAndMetadata<byte[], byte[]> msg = it.next();
            if (counter == 10) {
                System.out.println("Stopping Thread " + m_threadNumber + ":
Partition: " + msg.partition() +
                        ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
                break;
            }
            System.out.println("Thread " + m_threadNumber + ": Partition: "
+ msg.partition() +
                    ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
            counter++;
        }

        System.out.println("Shutting down Thread: " + m_threadNumber);
    }

I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.

This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.


Thanks,

Chris

Re: High Level Consumer error handling and clean exit

Posted by Chris Curtin <cu...@gmail.com>.
Thanks Ian.

Is your consumer multi-threaded? If so can you share how you coordinated
each of the threads so you knew it was 'okay' to commit across all the
threads? I'm stuck on how to do this without really complicating the
consumer.

Thanks,

Chris


On Tue, Jul 9, 2013 at 5:51 PM, Ian Friedman <ia...@flurry.com> wrote:

> Hey Chris,
>
> The way I handled this in my application using the High Level Consumer was
> to turn off auto-commit and commit manually after finishing a batch of
> messages (obviously you could do it after every message, but for my
> purposes it was better to have batches)
>
> --
> Ian Friedman
>
>
>

Re: High Level Consumer error handling and clean exit

Posted by Ian Friedman <ia...@flurry.com>.
Hey Chris, 

The way I handled this in my application using the High Level Consumer was to turn off auto-commit and commit manually after finishing a batch of messages (obviously you could do it after every message, but for my purposes it was better to have batches) 

-- 
Ian Friedman


On Tuesday, July 9, 2013 at 4:09 PM, Chris Curtin wrote:

> Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
> 
> 
> 
> On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <curtin.chris@gmail.com (mailto:curtin.chris@gmail.com)> wrote:
> 
> > Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> > the High Level consumer is _so_ close to being robust enough to handle
> > what I'd think people want to do in most applications. I'm going to submit
> > an enhancement request.
> > 
> > I'm trying to understand the level of data loss in this situation, so I
> > looked deeper into the KafkaStream logic: it looks like a KafkaStream
> > includes a BlockingQueue for transferring the messages to my code from
> > Kafka. If I call shutdown() when I detect the problem, are the messages
> > already in the BlockingQueue considered 'read' by Kafka, or does the
> > shutdown peek into the Queue to see what is still there before updating
> > ZooKeeper?
> > 
> > My concern is if that queue is not empty I'll be losing more than the one
> > message that led to the failure.
> > 
> > I'm also curious how others are handling this situation. Do you assume the
> > message that is causing problems is lost or somehow know to go get it
> > later? I'd think others would have this problem too.
> > 
> > Thanks,
> > 
> > Chris
> > 
> > 
> > 
> > On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <philip@loggly.com (mailto:philip@loggly.com)> wrote:
> > 
> > > OK.
> > > 
> > > It sounds like you're requesting functionality that the high-level
> > > consumer
> > > simply doesn't have. As I am sure you know, there is no API call that
> > > supports "handing back a message".
> > > 
> > > I might be missing something, but if you need this kind of control, I
> > > think
> > > you need to code your application differently. You could try creating a
> > > ConsumerConnection per partition (your clients will then need to know the
> > > number of partitions out there). That way commitOffsets() will actually
> > > only apply to that partition. Auto-commit the same way. It might give you
> > > the level of control you need.
> > > 
> > > Philip
> > > 
> > > On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <curtin.chris@gmail.com (mailto:curtin.chris@gmail.com)>
> > > wrote:
> > > 
> > > > Hi Philip,
> > > > 
> > > > Correct, I don't want to explicitly control the offset committing. The
> > > > ConsumerConnector handles that well enough except for when I want to
> > > > shutdown and NOT have Kafka think I consumed that last message for a
> > > > stream. This isn't the crash case, it is a case where the logic
> > > > 
> > > 
> > > consuming
> > > > the message detects and error and wants to cleanly exit until that issue
> > > > can be resolved, but not lose the message it was trying to process when
> > > > 
> > > 
> > > the
> > > > problem is resolved.
> > > > 
> > > > My understanding is that the commitOffsets() call is across all threads,
> > > > not just for the stream my thread is reading from. So knowing it is
> > > > 
> > > 
> > > okay to
> > > > call this requires coordination across all my threads, which makes a
> > > 
> > > High
> > > > Level Consumer a lot harder to write correctly.
> > > > 
> > > > Thinking about what I'd like to happen is: my code hands the message
> > > back
> > > > to the KafkaStream (or whatever level knows about the consumed offsets)
> > > 
> > > and
> > > > says
> > > > - set the next start offset for this topic/partition to this message in
> > > > ZooKeeper
> > > > - cleanly shutdown the stream from the broker(s)
> > > > - don't force a rebalance on the consumer since something is wrong with
> > > > processing of the data in the message, not the message.
> > > > - If I try to use the stream again I should get an exception
> > > > - I don't think I would want this to cause a complete shutdown of the
> > > > ConsumerConnector, in case other threads are still processing. If all
> > > > threads have the same issue they will all fail soon enough and do the
> > > > 
> > > 
> > > same
> > > > logic. But if only one thread fails, our Operations teams will need to
> > > > resolve the issue then do a clean restart to recover.
> > > > 
> > > > I think this logic would only happen when the down stream system was
> > > having
> > > > issues since the iterator would be drained correctly when the 'shutdown'
> > > > call to ConsumerConnector is made.
> > > > 
> > > > Thanks,
> > > > 
> > > > Chris
> > > > 
> > > > 
> > > > 
> > > > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <philip@loggly.com (mailto:philip@loggly.com)>
> > > wrote:
> > > > 
> > > > > It seems like you're not explicitly controlling the offsets. Is that
> > > > > correct?
> > > > > 
> > > > > If so, the moment you pull a message from the stream, the client
> > > > framework
> > > > > considers it processed. So if your app subsequently crashes before the
> > > > > message is fully processed, and "auto-commit" updates the offsets in
> > > > > Zookeeper, you will drop that message.
> > > > > 
> > > > > The solution to this to call commitOffsets() explicitly.
> > > > > 
> > > > > Philip
> > > > > 
> > > > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com (mailto:curtin.chris@gmail.com)
> > > > > > wrote:
> > > > > 
> > > > > 
> > > > > > Hi,
> > > > > > 
> > > > > > I'm working through a production-level High Level Consumer app and
> > > > have a
> > > > > > couple of error/shutdown questions to understand how the offset
> > > > > 
> > > > 
> > > > 
> > > 
> > > storage
> > > > > is
> > > > > > handled.
> > > > > > 
> > > > > > Test case - simulate an error writing to destination application,
> > > for
> > > > > > example a database, offset is 'lost'
> > > > > > 
> > > > > > Scenario
> > > > > > - write 500 messages for each topic/partition
> > > > > > - use the example High Level Consumer code I wrote for the Wiki
> > > > > > - Change the code so that every 10th read from the 'hasNext()'
> > > > > > ConsumerIterator breaks out of the loop and returns from the thread,
> > > > > > simulating a hard error. I write the offset to System.out to see
> > > > > > 
> > > > > 
> > > > 
> > > 
> > > what
> > > > was
> > > > > > provided
> > > > > > - startup again and look to see what offset was first emitted for a
> > > > > > partition
> > > > > > 
> > > > > > Issue: Kafka treats the offset for the message read that caused me
> > > to
> > > > > break
> > > > > > out of the loop as processed (as expected), but I really failed. How
> > > > > 
> > > > > 
> > > > 
> > > > do I
> > > > > > tell Kafka that I didn't really consume that offset?
> > > > > > 
> > > > > > Here is the example code in the 'business logic':
> > > > > > 
> > > > > > public void run() {
> > > > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > > > > int counter = 0;
> > > > > > while (it.hasNext()) {
> > > > > > MessageAndMetadata<byte[], byte[]> msg = it.next();
> > > > > > if (counter == 10) {
> > > > > > System.out.println("Stopping Thread " +
> > > > > > 
> > > > > 
> > > > 
> > > > 
> > > 
> > > m_threadNumber
> > > > +
> > > > > ":
> > > > > > Partition: " + msg.partition() +
> > > > > > ": Offset: " + msg.offset() + " :" + new
> > > > > > String(msg.message()));
> > > > > > break;
> > > > > > }
> > > > > > System.out.println("Thread " + m_threadNumber + ":
> > > > > > 
> > > > > 
> > > > > Partition: "
> > > > > > + msg.partition() +
> > > > > > ": Offset: " + msg.offset() + " :" + new
> > > > > > String(msg.message()));
> > > > > > counter++;
> > > > > > }
> > > > > > 
> > > > > > System.out.println("Shutting down Thread: " +
> > > m_threadNumber);
> > > > > > }
> > > > > > 
> > > > > > I understand that handling 'hard' errors like JVM crashes, kill -9
> > > etc.
> > > > > may
> > > > > > leave the offsets in ZooKeeper incorrect, but I'm trying to
> > > > > 
> > > > > 
> > > > 
> > > 
> > > understand
> > > > > what
> > > > > > happens in a clean shutdown where Kafka and the Consumer are
> > > > > 
> > > > > 
> > > > 
> > > 
> > > behaving
> > > > > > correctly but I can't process what I read.
> > > > > > 
> > > > > > This also feels like I'm blurring SimpleConsumer theory into this,
> > > but
> > > > > > except for the exception/shutdown case High Level Consumer does
> > > > > 
> > > > > everything
> > > > > > I want.
> > > > > > 
> > > > > > 
> > > > > > Thanks,
> > > > > > 
> > > > > > Chris 


Re: High Level Consumer error handling and clean exit

Posted by Chris Curtin <cu...@gmail.com>.
Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966



On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <cu...@gmail.com> wrote:

> Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> the High Level consumer is _so_ close to being robust enough to    handle
> what I'd think people want to do in most applications. I'm going to submit
> an enhancement request.
>
> I'm trying to understand the level of data loss in this situation, so I
> looked deeper into the KafkaStream logic: it looks like a KafkaStream
> includes a BlockingQueue for transferring the messages to my code from
> Kafka. If I call shutdown() when I detect the problem, are the messages
> already in the BlockingQueue considered 'read' by Kafka, or does the
> shutdown peek into the Queue to see what is still there before updating
> ZooKeeper?
>
> My concern is if that queue is not empty I'll be losing more than the one
> message that led to the failure.
>
> I'm also curious how others are handling this situation. Do you assume the
> message that is causing problems is lost or somehow know to go get it
> later? I'd think others would have this problem too.
>
> Thanks,
>
> Chris
>
>
>
> On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <ph...@loggly.com> wrote:
>
>> OK.
>>
>> It sounds like you're requesting functionality that the high-level
>> consumer
>> simply doesn't have. As I am sure you know, there is no API call that
>> supports "handing back a message".
>>
>> I might be missing something, but if you need this kind of control, I
>> think
>> you need to code your application differently. You could try creating a
>> ConsumerConnection per partition (your clients will then need to know the
>> number of partitions out there). That way commitOffsets() will actually
>> only apply to that partition. Auto-commit the same way. It might give you
>> the level of control you need.
>>
>> Philip
>>
>> On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <cu...@gmail.com>
>> wrote:
>>
>> > Hi Philip,
>> >
>> > Correct, I don't want to explicitly control the offset committing. The
>> > ConsumerConnector handles that well enough except for when I want to
>> > shutdown and NOT have Kafka think I consumed that last message for a
>> > stream. This isn't the crash case, it is a case where the logic
>> consuming
>> > the message detects and error and wants to cleanly exit until that issue
>> > can be resolved, but not lose the message it was trying to process when
>> the
>> > problem is resolved.
>> >
>> > My understanding is that the commitOffsets() call is across all threads,
>> > not just for the stream my thread is reading from. So knowing it is
>> okay to
>> > call this requires coordination across all my threads, which makes a
>> High
>> > Level Consumer a lot harder to write correctly.
>> >
>> > Thinking about what I'd like to happen is: my code hands the message
>> back
>> > to the KafkaStream (or whatever level knows about the consumed offsets)
>> and
>> > says
>> > - set the next start offset for this topic/partition to this message in
>> > ZooKeeper
>> > - cleanly shutdown the stream from the broker(s)
>> > - don't force a rebalance on the consumer since something is wrong with
>> > processing of the data in the message, not the message.
>> > - If I try to use the stream again I should get an exception
>> > - I don't think I would want this to cause a complete shutdown of the
>> > ConsumerConnector, in case other threads are still processing. If all
>> > threads have the same issue they will all fail soon enough and do the
>> same
>> > logic. But if only one thread fails, our Operations teams will need to
>> > resolve the issue then do a clean restart to recover.
>> >
>> > I think this logic would only happen when the down stream system was
>> having
>> > issues since the iterator would be drained correctly when the 'shutdown'
>> > call to ConsumerConnector is made.
>> >
>> > Thanks,
>> >
>> > Chris
>> >
>> >
>> >
>> > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <ph...@loggly.com>
>> wrote:
>> >
>> > > It seems like you're not explicitly controlling the offsets. Is that
>> > > correct?
>> > >
>> > > If so, the moment you pull a message from the stream, the client
>> > framework
>> > > considers it processed. So if your app subsequently crashes before the
>> > > message is fully processed, and "auto-commit" updates the offsets in
>> > > Zookeeper, you will drop that message.
>> > >
>> > > The solution to this to call commitOffsets() explicitly.
>> > >
>> > > Philip
>> > >
>> > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com
>> > > >wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm working through a production-level High Level Consumer app and
>> > have a
>> > > > couple of error/shutdown questions to understand how the offset
>> storage
>> > > is
>> > > > handled.
>> > > >
>> > > > Test case - simulate an error writing to destination application,
>> for
>> > > > example a database, offset is 'lost'
>> > > >
>> > > > Scenario
>> > > > - write 500 messages for each topic/partition
>> > > > - use the example High Level Consumer code I wrote for the Wiki
>> > > > - Change the code so that every 10th read from the 'hasNext()'
>> > > > ConsumerIterator breaks out of the loop and returns from the thread,
>> > > > simulating a hard error. I write the offset to System.out to see
>> what
>> > was
>> > > > provided
>> > > > - startup again and look to see what offset was first emitted for a
>> > > > partition
>> > > >
>> > > > Issue: Kafka treats the offset for the message read that caused me
>> to
>> > > break
>> > > > out of the loop as processed (as expected), but I really failed. How
>> > do I
>> > > > tell Kafka that I didn't really consume that offset?
>> > > >
>> > > > Here is the example code in the 'business logic':
>> > > >
>> > > > public void run() {
>> > > >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>> > > >         int counter = 0;
>> > > >         while (it.hasNext())   {
>> > > >             MessageAndMetadata<byte[], byte[]> msg = it.next();
>> > > >             if (counter == 10) {
>> > > >                 System.out.println("Stopping Thread " +
>> m_threadNumber
>> > +
>> > > ":
>> > > > Partition: " + msg.partition() +
>> > > >                         ": Offset: " + msg.offset() + " :" + new
>> > > > String(msg.message()));
>> > > >                 break;
>> > > >             }
>> > > >             System.out.println("Thread " + m_threadNumber + ":
>> > > Partition: "
>> > > > + msg.partition() +
>> > > >                     ": Offset: " + msg.offset() + " :" + new
>> > > > String(msg.message()));
>> > > >             counter++;
>> > > >         }
>> > > >
>> > > >         System.out.println("Shutting down Thread: " +
>> m_threadNumber);
>> > > >     }
>> > > >
>> > > > I understand that handling 'hard' errors like JVM crashes, kill -9
>> etc.
>> > > may
>> > > > leave the offsets in ZooKeeper incorrect, but I'm trying to
>> understand
>> > > what
>> > > > happens in a clean shutdown where Kafka and the Consumer are
>> behaving
>> > > > correctly but I can't process what I read.
>> > > >
>> > > > This also feels like I'm blurring SimpleConsumer theory into this,
>> but
>> > > > except for the exception/shutdown case High Level Consumer does
>> > > everything
>> > > > I want.
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Chris
>> > > >
>> > >
>> >
>>
>
>

Re: High Level Consumer error handling and clean exit

Posted by Chris Curtin <cu...@gmail.com>.
Thanks. I know I can write a SimpleConsumer to do this, but it feels like
the High Level consumer is _so_ close to being robust enough to    handle
what I'd think people want to do in most applications. I'm going to submit
an enhancement request.

I'm trying to understand the level of data loss in this situation, so I
looked deeper into the KafkaStream logic: it looks like a KafkaStream
includes a BlockingQueue for transferring the messages to my code from
Kafka. If I call shutdown() when I detect the problem, are the messages
already in the BlockingQueue considered 'read' by Kafka, or does the
shutdown peek into the Queue to see what is still there before updating
ZooKeeper?

My concern is if that queue is not empty I'll be losing more than the one
message that led to the failure.

I'm also curious how others are handling this situation. Do you assume the
message that is causing problems is lost or somehow know to go get it
later? I'd think others would have this problem too.

Thanks,

Chris



On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <ph...@loggly.com> wrote:

> OK.
>
> It sounds like you're requesting functionality that the high-level consumer
> simply doesn't have. As I am sure you know, there is no API call that
> supports "handing back a message".
>
> I might be missing something, but if you need this kind of control, I think
> you need to code your application differently. You could try creating a
> ConsumerConnection per partition (your clients will then need to know the
> number of partitions out there). That way commitOffsets() will actually
> only apply to that partition. Auto-commit the same way. It might give you
> the level of control you need.
>
> Philip
>
> On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <cu...@gmail.com>
> wrote:
>
> > Hi Philip,
> >
> > Correct, I don't want to explicitly control the offset committing. The
> > ConsumerConnector handles that well enough except for when I want to
> > shutdown and NOT have Kafka think I consumed that last message for a
> > stream. This isn't the crash case, it is a case where the logic consuming
> > the message detects and error and wants to cleanly exit until that issue
> > can be resolved, but not lose the message it was trying to process when
> the
> > problem is resolved.
> >
> > My understanding is that the commitOffsets() call is across all threads,
> > not just for the stream my thread is reading from. So knowing it is okay
> to
> > call this requires coordination across all my threads, which makes a High
> > Level Consumer a lot harder to write correctly.
> >
> > Thinking about what I'd like to happen is: my code hands the message back
> > to the KafkaStream (or whatever level knows about the consumed offsets)
> and
> > says
> > - set the next start offset for this topic/partition to this message in
> > ZooKeeper
> > - cleanly shutdown the stream from the broker(s)
> > - don't force a rebalance on the consumer since something is wrong with
> > processing of the data in the message, not the message.
> > - If I try to use the stream again I should get an exception
> > - I don't think I would want this to cause a complete shutdown of the
> > ConsumerConnector, in case other threads are still processing. If all
> > threads have the same issue they will all fail soon enough and do the
> same
> > logic. But if only one thread fails, our Operations teams will need to
> > resolve the issue then do a clean restart to recover.
> >
> > I think this logic would only happen when the down stream system was
> having
> > issues since the iterator would be drained correctly when the 'shutdown'
> > call to ConsumerConnector is made.
> >
> > Thanks,
> >
> > Chris
> >
> >
> >
> > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <ph...@loggly.com>
> wrote:
> >
> > > It seems like you're not explicitly controlling the offsets. Is that
> > > correct?
> > >
> > > If so, the moment you pull a message from the stream, the client
> > framework
> > > considers it processed. So if your app subsequently crashes before the
> > > message is fully processed, and "auto-commit" updates the offsets in
> > > Zookeeper, you will drop that message.
> > >
> > > The solution to this to call commitOffsets() explicitly.
> > >
> > > Philip
> > >
> > > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com
> > > >wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm working through a production-level High Level Consumer app and
> > have a
> > > > couple of error/shutdown questions to understand how the offset
> storage
> > > is
> > > > handled.
> > > >
> > > > Test case - simulate an error writing to destination application, for
> > > > example a database, offset is 'lost'
> > > >
> > > > Scenario
> > > > - write 500 messages for each topic/partition
> > > > - use the example High Level Consumer code I wrote for the Wiki
> > > > - Change the code so that every 10th read from the 'hasNext()'
> > > > ConsumerIterator breaks out of the loop and returns from the thread,
> > > > simulating a hard error. I write the offset to System.out to see what
> > was
> > > > provided
> > > > - startup again and look to see what offset was first emitted for a
> > > > partition
> > > >
> > > > Issue: Kafka treats the offset for the message read that caused me to
> > > break
> > > > out of the loop as processed (as expected), but I really failed. How
> > do I
> > > > tell Kafka that I didn't really consume that offset?
> > > >
> > > > Here is the example code in the 'business logic':
> > > >
> > > > public void run() {
> > > >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > >         int counter = 0;
> > > >         while (it.hasNext())   {
> > > >             MessageAndMetadata<byte[], byte[]> msg = it.next();
> > > >             if (counter == 10) {
> > > >                 System.out.println("Stopping Thread " +
> m_threadNumber
> > +
> > > ":
> > > > Partition: " + msg.partition() +
> > > >                         ": Offset: " + msg.offset() + " :" + new
> > > > String(msg.message()));
> > > >                 break;
> > > >             }
> > > >             System.out.println("Thread " + m_threadNumber + ":
> > > Partition: "
> > > > + msg.partition() +
> > > >                     ": Offset: " + msg.offset() + " :" + new
> > > > String(msg.message()));
> > > >             counter++;
> > > >         }
> > > >
> > > >         System.out.println("Shutting down Thread: " +
> m_threadNumber);
> > > >     }
> > > >
> > > > I understand that handling 'hard' errors like JVM crashes, kill -9
> etc.
> > > may
> > > > leave the offsets in ZooKeeper incorrect, but I'm trying to
> understand
> > > what
> > > > happens in a clean shutdown where Kafka and the Consumer are behaving
> > > > correctly but I can't process what I read.
> > > >
> > > > This also feels like I'm blurring SimpleConsumer theory into this,
> but
> > > > except for the exception/shutdown case High Level Consumer does
> > > everything
> > > > I want.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Chris
> > > >
> > >
> >
>

Re: High Level Consumer error handling and clean exit

Posted by Philip O'Toole <ph...@loggly.com>.
OK.

It sounds like you're requesting functionality that the high-level consumer
simply doesn't have. As I am sure you know, there is no API call that
supports "handing back a message".

I might be missing something, but if you need this kind of control, I think
you need to code your application differently. You could try creating a
ConsumerConnection per partition (your clients will then need to know the
number of partitions out there). That way commitOffsets() will actually
only apply to that partition. Auto-commit the same way. It might give you
the level of control you need.

Philip

On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <cu...@gmail.com> wrote:

> Hi Philip,
>
> Correct, I don't want to explicitly control the offset committing. The
> ConsumerConnector handles that well enough except for when I want to
> shutdown and NOT have Kafka think I consumed that last message for a
> stream. This isn't the crash case, it is a case where the logic consuming
> the message detects and error and wants to cleanly exit until that issue
> can be resolved, but not lose the message it was trying to process when the
> problem is resolved.
>
> My understanding is that the commitOffsets() call is across all threads,
> not just for the stream my thread is reading from. So knowing it is okay to
> call this requires coordination across all my threads, which makes a High
> Level Consumer a lot harder to write correctly.
>
> Thinking about what I'd like to happen is: my code hands the message back
> to the KafkaStream (or whatever level knows about the consumed offsets) and
> says
> - set the next start offset for this topic/partition to this message in
> ZooKeeper
> - cleanly shutdown the stream from the broker(s)
> - don't force a rebalance on the consumer since something is wrong with
> processing of the data in the message, not the message.
> - If I try to use the stream again I should get an exception
> - I don't think I would want this to cause a complete shutdown of the
> ConsumerConnector, in case other threads are still processing. If all
> threads have the same issue they will all fail soon enough and do the same
> logic. But if only one thread fails, our Operations teams will need to
> resolve the issue then do a clean restart to recover.
>
> I think this logic would only happen when the down stream system was having
> issues since the iterator would be drained correctly when the 'shutdown'
> call to ConsumerConnector is made.
>
> Thanks,
>
> Chris
>
>
>
> On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <ph...@loggly.com> wrote:
>
> > It seems like you're not explicitly controlling the offsets. Is that
> > correct?
> >
> > If so, the moment you pull a message from the stream, the client
> framework
> > considers it processed. So if your app subsequently crashes before the
> > message is fully processed, and "auto-commit" updates the offsets in
> > Zookeeper, you will drop that message.
> >
> > The solution to this to call commitOffsets() explicitly.
> >
> > Philip
> >
> > On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com
> > >wrote:
> >
> > > Hi,
> > >
> > > I'm working through a production-level High Level Consumer app and
> have a
> > > couple of error/shutdown questions to understand how the offset storage
> > is
> > > handled.
> > >
> > > Test case - simulate an error writing to destination application, for
> > > example a database, offset is 'lost'
> > >
> > > Scenario
> > > - write 500 messages for each topic/partition
> > > - use the example High Level Consumer code I wrote for the Wiki
> > > - Change the code so that every 10th read from the 'hasNext()'
> > > ConsumerIterator breaks out of the loop and returns from the thread,
> > > simulating a hard error. I write the offset to System.out to see what
> was
> > > provided
> > > - startup again and look to see what offset was first emitted for a
> > > partition
> > >
> > > Issue: Kafka treats the offset for the message read that caused me to
> > break
> > > out of the loop as processed (as expected), but I really failed. How
> do I
> > > tell Kafka that I didn't really consume that offset?
> > >
> > > Here is the example code in the 'business logic':
> > >
> > > public void run() {
> > >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > >         int counter = 0;
> > >         while (it.hasNext())   {
> > >             MessageAndMetadata<byte[], byte[]> msg = it.next();
> > >             if (counter == 10) {
> > >                 System.out.println("Stopping Thread " + m_threadNumber
> +
> > ":
> > > Partition: " + msg.partition() +
> > >                         ": Offset: " + msg.offset() + " :" + new
> > > String(msg.message()));
> > >                 break;
> > >             }
> > >             System.out.println("Thread " + m_threadNumber + ":
> > Partition: "
> > > + msg.partition() +
> > >                     ": Offset: " + msg.offset() + " :" + new
> > > String(msg.message()));
> > >             counter++;
> > >         }
> > >
> > >         System.out.println("Shutting down Thread: " + m_threadNumber);
> > >     }
> > >
> > > I understand that handling 'hard' errors like JVM crashes, kill -9 etc.
> > may
> > > leave the offsets in ZooKeeper incorrect, but I'm trying to understand
> > what
> > > happens in a clean shutdown where Kafka and the Consumer are behaving
> > > correctly but I can't process what I read.
> > >
> > > This also feels like I'm blurring SimpleConsumer theory into this, but
> > > except for the exception/shutdown case High Level Consumer does
> > everything
> > > I want.
> > >
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> >
>

Re: High Level Consumer error handling and clean exit

Posted by Chris Curtin <cu...@gmail.com>.
Hi Philip,

Correct, I don't want to explicitly control the offset committing. The
ConsumerConnector handles that well enough except for when I want to
shutdown and NOT have Kafka think I consumed that last message for a
stream. This isn't the crash case, it is a case where the logic consuming
the message detects and error and wants to cleanly exit until that issue
can be resolved, but not lose the message it was trying to process when the
problem is resolved.

My understanding is that the commitOffsets() call is across all threads,
not just for the stream my thread is reading from. So knowing it is okay to
call this requires coordination across all my threads, which makes a High
Level Consumer a lot harder to write correctly.

Thinking about what I'd like to happen is: my code hands the message back
to the KafkaStream (or whatever level knows about the consumed offsets) and
says
- set the next start offset for this topic/partition to this message in
ZooKeeper
- cleanly shutdown the stream from the broker(s)
- don't force a rebalance on the consumer since something is wrong with
processing of the data in the message, not the message.
- If I try to use the stream again I should get an exception
- I don't think I would want this to cause a complete shutdown of the
ConsumerConnector, in case other threads are still processing. If all
threads have the same issue they will all fail soon enough and do the same
logic. But if only one thread fails, our Operations teams will need to
resolve the issue then do a clean restart to recover.

I think this logic would only happen when the down stream system was having
issues since the iterator would be drained correctly when the 'shutdown'
call to ConsumerConnector is made.

Thanks,

Chris



On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <ph...@loggly.com> wrote:

> It seems like you're not explicitly controlling the offsets. Is that
> correct?
>
> If so, the moment you pull a message from the stream, the client framework
> considers it processed. So if your app subsequently crashes before the
> message is fully processed, and "auto-commit" updates the offsets in
> Zookeeper, you will drop that message.
>
> The solution to this to call commitOffsets() explicitly.
>
> Philip
>
> On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <curtin.chris@gmail.com
> >wrote:
>
> > Hi,
> >
> > I'm working through a production-level High Level Consumer app and have a
> > couple of error/shutdown questions to understand how the offset storage
> is
> > handled.
> >
> > Test case - simulate an error writing to destination application, for
> > example a database, offset is 'lost'
> >
> > Scenario
> > - write 500 messages for each topic/partition
> > - use the example High Level Consumer code I wrote for the Wiki
> > - Change the code so that every 10th read from the 'hasNext()'
> > ConsumerIterator breaks out of the loop and returns from the thread,
> > simulating a hard error. I write the offset to System.out to see what was
> > provided
> > - startup again and look to see what offset was first emitted for a
> > partition
> >
> > Issue: Kafka treats the offset for the message read that caused me to
> break
> > out of the loop as processed (as expected), but I really failed. How do I
> > tell Kafka that I didn't really consume that offset?
> >
> > Here is the example code in the 'business logic':
> >
> > public void run() {
> >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> >         int counter = 0;
> >         while (it.hasNext())   {
> >             MessageAndMetadata<byte[], byte[]> msg = it.next();
> >             if (counter == 10) {
> >                 System.out.println("Stopping Thread " + m_threadNumber +
> ":
> > Partition: " + msg.partition() +
> >                         ": Offset: " + msg.offset() + " :" + new
> > String(msg.message()));
> >                 break;
> >             }
> >             System.out.println("Thread " + m_threadNumber + ":
> Partition: "
> > + msg.partition() +
> >                     ": Offset: " + msg.offset() + " :" + new
> > String(msg.message()));
> >             counter++;
> >         }
> >
> >         System.out.println("Shutting down Thread: " + m_threadNumber);
> >     }
> >
> > I understand that handling 'hard' errors like JVM crashes, kill -9 etc.
> may
> > leave the offsets in ZooKeeper incorrect, but I'm trying to understand
> what
> > happens in a clean shutdown where Kafka and the Consumer are behaving
> > correctly but I can't process what I read.
> >
> > This also feels like I'm blurring SimpleConsumer theory into this, but
> > except for the exception/shutdown case High Level Consumer does
> everything
> > I want.
> >
> >
> > Thanks,
> >
> > Chris
> >
>

Re: High Level Consumer error handling and clean exit

Posted by Philip O'Toole <ph...@loggly.com>.
It seems like you're not explicitly controlling the offsets. Is that
correct?

If so, the moment you pull a message from the stream, the client framework
considers it processed. So if your app subsequently crashes before the
message is fully processed, and "auto-commit" updates the offsets in
Zookeeper, you will drop that message.

The solution to this to call commitOffsets() explicitly.

Philip

On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <cu...@gmail.com>wrote:

> Hi,
>
> I'm working through a production-level High Level Consumer app and have a
> couple of error/shutdown questions to understand how the offset storage is
> handled.
>
> Test case - simulate an error writing to destination application, for
> example a database, offset is 'lost'
>
> Scenario
> - write 500 messages for each topic/partition
> - use the example High Level Consumer code I wrote for the Wiki
> - Change the code so that every 10th read from the 'hasNext()'
> ConsumerIterator breaks out of the loop and returns from the thread,
> simulating a hard error. I write the offset to System.out to see what was
> provided
> - startup again and look to see what offset was first emitted for a
> partition
>
> Issue: Kafka treats the offset for the message read that caused me to break
> out of the loop as processed (as expected), but I really failed. How do I
> tell Kafka that I didn't really consume that offset?
>
> Here is the example code in the 'business logic':
>
> public void run() {
>         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>         int counter = 0;
>         while (it.hasNext())   {
>             MessageAndMetadata<byte[], byte[]> msg = it.next();
>             if (counter == 10) {
>                 System.out.println("Stopping Thread " + m_threadNumber + ":
> Partition: " + msg.partition() +
>                         ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>                 break;
>             }
>             System.out.println("Thread " + m_threadNumber + ": Partition: "
> + msg.partition() +
>                     ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>             counter++;
>         }
>
>         System.out.println("Shutting down Thread: " + m_threadNumber);
>     }
>
> I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
> leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
> happens in a clean shutdown where Kafka and the Consumer are behaving
> correctly but I can't process what I read.
>
> This also feels like I'm blurring SimpleConsumer theory into this, but
> except for the exception/shutdown case High Level Consumer does everything
> I want.
>
>
> Thanks,
>
> Chris
>