You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Alex Sobrino <al...@v5tech.es> on 2015/07/13 09:18:07 UTC

Re: kafka offset WARN stops tuples incoming

Hi,

@Benjamin have you been able to solve this issue? We're having exactly the
same behavior.

I understand what Nikhil is saying, but when this problem happens, no tuple
is being processed at all. The topology seems to be frozen.

In fact, looking at the Storm logs:

2015-07-13T08:34:18.797+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [5004855]; retrying with default start offset time
from configuration. configured start offset time: [-2]
2015-07-13T08:34:18.797+0200 s.k.PartitionManager [WARN] Using new offset:
6045274
2015-07-13T08:34:18.799+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [5004855]; retrying with default start offset time
from configuration. configured start offset time: [-2]
2015-07-13T08:34:18.799+0200 s.k.PartitionManager [WARN] Using new offset:
6045274
2015-07-13T08:34:18.801+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [5004855]; retrying with default start offset time
from configuration. configured start offset time: [-2]
2015-07-13T08:34:18.801+0200 s.k.PartitionManager [WARN] Using new offset:
6045274

And looking at Kafka's logs:

server.log.2015-07-13-07:[2015-07-13 07:38:52,586] ERROR [Replica Manager
on Broker 1]: Error when processing fetch request for partition [AGH,1]
offset 5004855 from consumer with correlation id 0. Possible cause: Request
for offset 5004855 but we only have log segments in the range 6045274 to
6062679. (kafka.server.ReplicaManager)
server.log.2015-07-13-07:[2015-07-13 07:38:52,588] ERROR [Replica Manager
on Broker 1]: Error when processing fetch request for partition [AGH,1]
offset 5004855 from consumer with correlation id 0. Possible cause: Request
for offset 5004855 but we only have log segments in the range 6045274 to
6062679. (kafka.server.ReplicaManager)
server.log.2015-07-13-07:[2015-07-13 07:38:52,589] ERROR [Replica Manager
on Broker 1]: Error when processing fetch request for partition [AGH,1]
offset 5004855 from consumer with correlation id 0. Possible cause: Request
for offset 5004855 but we only have log segments in the range 6045274 to
6062679. (kafka.server.ReplicaManager)


KafkaSpout continues trying to fetch offset 5004855 although it knows it's
not longer available. Storm and Kafka logs grow like hell repeating the
same message over and over without processing a single message.

Here's the --describe of out topic with a 2h retention:

Topic:TPC    PartitionCount:3    ReplicationFactor:2    Configs:retention.ms
=7200000
    Topic: TPC    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 0,1
    Topic: TPC    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 2,1
    Topic: TPC    Partition: 2    Leader: 2    Replicas: 2,0    Isr: 2,0

We've tried withkafkaConfig.forceFromStart = true / false and
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime() and
kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true. We've also tried
with Storm 0.9.4 and 0.9.5.

Anything we're missing here? :-\

Thanks!

On Thu, May 21, 2015 at 8:17 AM, Benjamin Cuthbert <cu...@gmail.com>
wrote:

> Hi,
>
> Thanks for that so is the storm-kafka spout not fast enough to process the
> information? I do not think I am pushing that many messages into a kafka
> queue as the data has already been split up.
>
>
> On 21 May 2015, at 05:34, Nikhil Singh <ns...@yahoo.com> wrote:
>
> Kafka has fixed retention window for topics. If the retention is full, it
> will drop the messages from the tail.
>
> So here what 'might' be happening : the rate at which you are pushing the
> data to kafka is faster than the rate at which the consumers can consume
> the messages.
>
> Assume you have full kafka queue, with incoming message rate twice that of
> consumption rate, i.e. you get two messages incoming per time unit and one
> message consumed per time unit. Let us say that the consumer is at offset
> 'x' which is at the tail of the kafka queue. After consuming data, next
> time the consumer goes and asks for offset 'x+1'. However since the rate of
> incoming data is double, kafka has already pushed out 'x' and 'x+1' from
> the tail and is now it is pointing to 'x+2'.
>
> Kafka will reset the offset to 'x+2' for consumer and issue a warning that
> the offset 'x+1' is not there. This cycle will continue.
>
> -Nikhil
>
>
>
>   On Wednesday, May 20, 2015 3:24 PM, Benjamin Cuthbert <
> cuthbert.ben@gmail.com> wrote:
>
>
> I am seeing the following:
>
> 41828 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils -
> Task [1/1] assigned [Partition{host=node:9092, partition=0}]
> 41829 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] Deleted partition managers: []
> 41829 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] New partition managers: [Partition{host=node:9092, partition=0}]
> 41930 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Read partition information from:
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
> 42399 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - No partition information found, using
> configuration to determine offset
> 42399 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Starting Kafka
> price-engine-demo-server.c.celertech-01.internal:0 from offset 3927313
> 42400 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] Finished refreshing
> 45097 [ProcessThread(sid:0 cport:-1):] INFO
> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x14d72fc889d000c type:create
> cxid:0x3 zxid:0x2c txntype:-1 reqpath:n/a Error
> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout
> Error:KeeperErrorCode = NoNode for
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>
> When viewing the zookeeper information there is no path there. Does it get
> created automatically?
>
> On 19 May 2015, at 18:19, Benjamin Cuthbert <cu...@gmail.com>
> wrote:
>
> The kafka logs have this
>
> [2015-05-19 17:13:39,772] ERROR [KafkaApi-0] Error when processing fetch
> request for partition [warehouse_prices,0] offset 73792051 from consumer
> with correlation id 0 (kafka.server.KafkaApis)
> kafka.common.OffsetOutOfRangeException: Request for offset 73792051 but we
> only have log segments in the range 74630863 to 75835831.
> at kafka.log.Log.read(Log.scala:380)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>
> How can that be? How can the requestor be behind kafka?
>
>
> On 19 May 2015, at 14:54, Benjamin Cuthbert <cu...@gmail.com>
> wrote:
>
> Hi Jeff,
>
> So I looked at the docs and I reset the following property:
>
> SpoutConfig spoutConfig = new SpoutConfig(
> hosts,
> topic, // topic to read from
> KAFKA_STORM_DIR, // the root path in Zookeeper for the spout to store the
> consumer offsets
> newSpoutId); // an id for this consumer for storing the consumer offsets
> in Zookeeper
>
> //Check if we should be consuming messages from the beginning
> spoutConfig.forceFromStart = consumeFromBeginning;
> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false;
>
> But after an hour of processing details I see
>
> 2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data
> from [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
> 2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch failed
> storm.kafka.FailedFetchException: Error fetching data from
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190)
> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
> at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
> at storm.kafka.PartitionManager.next(PartitionManager.java:124)
> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
> at
> backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
> [storm-core-0.9.4.jar:0.9.4]
> at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
> [storm-core-0.9.4.jar:0.9.4]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
>
>
> On 18 May 2015, at 22:13, Benjamin Cuthbert <cu...@gmail.com>
> wrote:
>
> Thanks Jeff,
>
> So I looked over the docs but I don’t understand is it runs for 2+ hours
> then just starts going:
>
> 2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset:
> 64429892
> 2015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [63610973]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset:
> 64429934
> 2015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [63610973]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
>
> So why does the offset get reset?
>
>
> On 18 May 2015, at 20:37, Jeffery Maass <ma...@gmail.com> wrote:
>
> The answer will be in how you configured the kafka spout.  If after
> reading the below, you still need help, please grab the values for all of
> the settings mentioned in the doc and send them on.
>
> See this document about the Kafka Spout:
>
> http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf
>
> See this archive message:
>
> http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3CCAHzwdygrA33uiv+PO01mvTHvrJSxvqLVx6BabyMZYE8zu_d3hw@mail.gmail.com%3E
>
> "
> Not exactly.. forceFromStart=true will tell the spout to start reading from
> whatever is set in startOffsetTime (available options are the earliest
> offset or the latest offset). If forceFromStart=false then startOffsetTime
> is not used at all and the offset is just retrieved from zookeeper, if it's
> available.
>
> The "Start" in "forceFromStart" has nothing to do with consuming from the
> beginning of the topic. I interpret it as referring to whether you are
> going to force starting consumption from a different offset.
> "
>
> Thank you for your time!
>
> +++++++++++++++++++++
> Jeff Maass <ma...@gmail.com>
> linkedin.com/in/jeffmaass
> stackoverflow.com/users/373418/maassql
> +++++++++++++++++++++
>
>
> On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <cuthbert.ben@gmail.com
> > wrote:
>
> All,
>
> We are getting loads of these errors
>
> 2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [62379213]; retrying with default start offset time
> from configuration. configured start offset time: [-2]
> 2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset:
> 63495047
>
> And it is stopping messages being pulled from Kafka into the spout and
> onto the bolts. Any ideas on how to fix this?
>
>
>
>
>
>
>
>
>
>


-- 
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es