You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Zhishan Li <zh...@gmail.com> on 2015/12/07 05:37:47 UTC

Kafka Source Error

When I use KafkaSource, the following error is raised:

	07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-] (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for partition [5] out of range; reset offset to 483146676	
	Current offset 482243452 for partition [log,5] out of range; reset offset to 483146676
	consumed offset: 482243452 doesn't match fetch offset: 483146676 for log:5: fetched offset = 483147611: consumed offset = 482243452;
 	Consumer may lose data

But the default configuration of KafkaSource is used. 

What happens during the agent running?

Thanks


Re: Kafka Source Error

Posted by Zhishan Li <zh...@gmail.com>.
Hi Gonzalo,


Thank you very much.  And I have another question:

Here is my metrics, It seems no data lost, right?

{
  "SOURCE.KafkaSource": {
    "KafkaEventGetTimer": "11916336",
    "Type": "SOURCE",
    "EventAcceptedCount": "28938986",
    "AppendReceivedCount": "0",
    "EventReceivedCount": "28938986",
    "KafkaCommitTimer": "0",
    "KafkaEmptyCount": "0",
    "OpenConnectionCount": "0",
    "AppendBatchReceivedCount": "0",
    "AppendBatchAcceptedCount": "0",
    "StopTime": "0",
    "StartTime": "1449463617288",
    "AppendAcceptedCount": "0"
  },
  "SINK.k4": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "0",
    "EventDrainAttemptCount": "9",
    "ConnectionCreatedCount": "1",
    "Type": "SINK",
    "BatchEmptyCount": "823",
    "ConnectionClosedCount": "0",
    "EventDrainSuccessCount": "9",
    "StopTime": "0",
    "StartTime": "1449463610232",
    "BatchUnderflowCount": "9"
  },
  "CHANNEL.c1": {
    "EventPutSuccessCount": "28938977",
    "ChannelFillPercentage": "4.885",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "28938977",
    "ChannelSize": "977",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "28938000",
    "ChannelCapacity": "20000",
    "EventTakeAttemptCount": "28938979"
  },
  "SINK.k1": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "28938",
    "EventDrainAttemptCount": "28938977",
    "ConnectionCreatedCount": "167",
    "Type": "SINK",
    "BatchEmptyCount": "1",
    "ConnectionClosedCount": "159",
    "EventDrainSuccessCount": "28938000",
    "StopTime": "0",
    "StartTime": "1449463610233",
    "BatchUnderflowCount": "0"
  },
  "CHANNEL.c2": {
    "EventPutSuccessCount": "9",
    "ChannelFillPercentage": "0.0",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "9",
    "ChannelSize": "0",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "9",
    "ChannelCapacity": "3000",
    "EventTakeAttemptCount": "841"
  }
}


The pipeline: KafkaSource -> c1 -> k1
		     KafkaSource  -> c2 -> k4

But I don’t know why the counter name not match with the source code? For example:
"EventReceivedCount": "28938986",
And in KafKaSource.java
line 127: counter.addToEventReceivedCount(Long.valueOf(eventList.size()));


it calls SourceCounter.java
public long addToEventReceivedCount(long delta) {
  return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
}
The COUNTER_EVENTS_RECEIVED defined in SourceCounter.java as below:
private static final String COUNTER_EVENTS_RECEIVED =
    "src.events.received";


So how the counter name is changed from source code to monitor metrics?

Thanks.




> On 7 Dec, 2015, at 5:37 pm, Gonzalo Herreros <gh...@gmail.com> wrote:
> 
> Kafka consumers keep track of the progress made (offset) in Zookeeper (there is an option in the newer versions to change that but Flume still uses the old way).
> What happens there is that when the consumer tries to get messages from the offset it knows but Kafka comes back saying that offset is not present and it requests the client to "reset" the offset (not sure if it resets to the oldest or newest).
> 
> That might indicate either a conflict because some other kafka cluster is using the same Zookeeper and groupId, or that the kafka retention is so low that messages in the queue get deleted before they can be processed. (A third option is that the Kafka cluster is completely messed up).
> 
> In my view, this is a Kafka issue and not Flume's, try to troubleshoot Kafka first.
> When you say "messages lost in the flume pipeline", do you mean that error you are getting or you have some other issue?
> 
> 
> On 7 December 2015 at 09:02, Zhishan Li <zhishanlee@gmail.com <ma...@gmail.com>> wrote:
> Hi Gonzalo,
> 
> Thanks for your reply. But I still can not figure out why the value of offset is frequently reset. I am sure, the groupId is unique for my kafka cluster.
> 
> I find that some messages lost in flume pipeline.  But I don’t know the reason. Please do me a favour. 
> 
> Thanks,
> 
> 
> 
>> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <gherreros@gmail.com <ma...@gmail.com>> wrote:
>> 
>> What that means is that the KafkaSource is trying to read messages from the last time it was running (or at least the last time some client used kafka with the same groupId) but they have been already deleted by Kafka so is working you that there are messages that have been missed.
>> Even if is the first time you use the KafkaSource, maybe somebody used a Kafka consumer with the same groupId long ago. It's better if you make up your own groupId so you don't have strange conflicts.
>> 
>> Regards,
>> Gonzalo
>> 
>> 
>> On 7 December 2015 at 04:37, Zhishan Li <zhishanlee@gmail.com <ma...@gmail.com>> wrote:
>> When I use KafkaSource, the following error is raised:
>> 
>> 	07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-] (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for partition [5] out of range; reset offset to 483146676	
>> 	Current offset 482243452 for partition [log,5] out of range; reset offset to 483146676
>> 	consumed offset: 482243452 doesn't match fetch offset: 483146676 for log:5: fetched offset = 483147611: consumed offset = 482243452;
>>  	Consumer may lose data
>> 
>> But the default configuration of KafkaSource is used. 
>> 
>> What happens during the agent running?
>> 
>> Thanks
>> 
>> 
> 
> 


Re: Kafka Source Error

Posted by Gonzalo Herreros <gh...@gmail.com>.
Kafka consumers keep track of the progress made (offset) in Zookeeper
(there is an option in the newer versions to change that but Flume still
uses the old way).
What happens there is that when the consumer tries to get messages from the
offset it knows but Kafka comes back saying that offset is not present and
it requests the client to "reset" the offset (not sure if it resets to the
oldest or newest).

That might indicate either a conflict because some other kafka cluster is
using the same Zookeeper and groupId, or that the kafka retention is so low
that messages in the queue get deleted before they can be processed. (A
third option is that the Kafka cluster is completely messed up).

In my view, this is a Kafka issue and not Flume's, try to troubleshoot
Kafka first.
When you say "messages lost in the flume pipeline", do you mean that error
you are getting or you have some other issue?


On 7 December 2015 at 09:02, Zhishan Li <zh...@gmail.com> wrote:

> Hi Gonzalo,
>
> Thanks for your reply. But I still can not figure out why the value of
> *offset* is frequently reset. I am sure, the *groupId* is unique for my
> kafka cluster.
>
> I find that some messages lost in flume pipeline.  But I don’t know the
> reason. Please do me a favour.
>
> Thanks,
>
>
>
> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <gh...@gmail.com> wrote:
>
> What that means is that the KafkaSource is trying to read messages from
> the last time it was running (or at least the last time some client used
> kafka with the same groupId) but they have been already deleted by Kafka so
> is working you that there are messages that have been missed.
> Even if is the first time you use the KafkaSource, maybe somebody used a
> Kafka consumer with the same groupId long ago. It's better if you make up
> your own groupId so you don't have strange conflicts.
>
> Regards,
> Gonzalo
>
>
> On 7 December 2015 at 04:37, Zhishan Li <zh...@gmail.com> wrote:
>
>> When I use KafkaSource, the following error is raised:
>>
>> *07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-]
>> (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for
>> partition [5] out of range; reset offset to 483146676 *
>> * Current offset 482243452 for partition [log,5] out of range; reset
>> offset to 483146676*
>> * consumed offset: 482243452 doesn't match fetch offset: 483146676 for
>> log:5: fetched offset = 483147611: consumed offset = 482243452;*
>> *  Consumer may lose data*
>>
>> But the default configuration of KafkaSource is used.
>>
>> What happens during the agent running?
>>
>> Thanks
>>
>>
>
>

Re: Kafka Source Error

Posted by Zhishan Li <zh...@gmail.com>.
Hi Gonzalo,

Thanks for your reply. But I still can not figure out why the value of offset is frequently reset. I am sure, the groupId is unique for my kafka cluster.

I find that some messages lost in flume pipeline.  But I don’t know the reason. Please do me a favour. 

Thanks,



> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <gh...@gmail.com> wrote:
> 
> What that means is that the KafkaSource is trying to read messages from the last time it was running (or at least the last time some client used kafka with the same groupId) but they have been already deleted by Kafka so is working you that there are messages that have been missed.
> Even if is the first time you use the KafkaSource, maybe somebody used a Kafka consumer with the same groupId long ago. It's better if you make up your own groupId so you don't have strange conflicts.
> 
> Regards,
> Gonzalo
> 
> 
> On 7 December 2015 at 04:37, Zhishan Li <zhishanlee@gmail.com <ma...@gmail.com>> wrote:
> When I use KafkaSource, the following error is raised:
> 
> 	07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-] (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for partition [5] out of range; reset offset to 483146676	
> 	Current offset 482243452 for partition [log,5] out of range; reset offset to 483146676
> 	consumed offset: 482243452 doesn't match fetch offset: 483146676 for log:5: fetched offset = 483147611: consumed offset = 482243452;
>  	Consumer may lose data
> 
> But the default configuration of KafkaSource is used. 
> 
> What happens during the agent running?
> 
> Thanks
> 
> 


Re: Kafka Source Error

Posted by Gonzalo Herreros <gh...@gmail.com>.
What that means is that the KafkaSource is trying to read messages from the
last time it was running (or at least the last time some client used kafka
with the same groupId) but they have been already deleted by Kafka so is
working you that there are messages that have been missed.
Even if is the first time you use the KafkaSource, maybe somebody used a
Kafka consumer with the same groupId long ago. It's better if you make up
your own groupId so you don't have strange conflicts.

Regards,
Gonzalo


On 7 December 2015 at 04:37, Zhishan Li <zh...@gmail.com> wrote:

> When I use KafkaSource, the following error is raised:
>
> *07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-]
> (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for
> partition [5] out of range; reset offset to 483146676 *
> * Current offset 482243452 for partition [log,5] out of range; reset
> offset to 483146676*
> * consumed offset: 482243452 doesn't match fetch offset: 483146676 for
> log:5: fetched offset = 483147611: consumed offset = 482243452;*
> *  Consumer may lose data*
>
> But the default configuration of KafkaSource is used.
>
> What happens during the agent running?
>
> Thanks
>
>