You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Justin Miller <ju...@protectwise.com> on 2018/01/17 05:10:12 UTC

"Got wrong record after seeking to offset" issue

Greetings all,

I’ve recently started hitting on the following error in Spark Streaming in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms even to five minutes doesn’t seem to be helping. The problem only manifested in the last few days, restarting with a new consumer group seems to remedy the issue for a few hours (< retention, which is 12 hours).

Error:
Caused by: java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<consumergrouphere> <topichere> 76 even after seeking to offset 1759148155
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

I guess my questions are, why is that assertion a job killer vs a warning and is there anything I can tweak settings wise that may keep it at bay.

I wouldn’t be surprised if this issue were exacerbated by the volume we do on Kafka topics (~150k/sec on the persister that’s crashing).

Thank you!
Justin


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: "Got wrong record after seeking to offset" issue

Posted by Justin Miller <ju...@protectwise.com>.
Yeah I saw that after I sent that e-mail out. Iactually remembered another ticket that I had commented on that turned out to be unrelated to the issue I was seeing at the time. It may be related to the current issue:

https://issues.apache.org/jira/browse/SPARK-17147 <https://issues.apache.org/jira/browse/SPARK-17147>

We are compacting topics, but only offset topics. We just updated our message version to 0.10 today as our last non-Spark project was brought up to 0.11 (Storm based).

Justin

> On Jan 18, 2018, at 1:39 PM, Cody Koeninger <co...@koeninger.org> wrote:
> 
> https://kafka.apache.org/documentation/#compaction
> 
> On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
> <ju...@protectwise.com> wrote:
>> By compacted do you mean compression? If so then we did recently turn on lz4
>> compression. If there’s another meaning if there’s a command I can run to
>> check compaction I’m happy to give that a shot too.
>> 
>> I’ll try consuming from the failed offset if/when the problem manifests
>> itself again.
>> 
>> Thanks!
>> Justin
>> 
>> 
>> On Wednesday, January 17, 2018, Cody Koeninger <co...@koeninger.org> wrote:
>>> 
>>> That means the consumer on the executor tried to seek to the specified
>>> offset, but the message that was returned did not have a matching
>>> offset.  If the executor can't get the messages the driver told it to
>>> get, something's generally wrong.
>>> 
>>> What happens when you try to consume the particular failing offset
>>> from another  (e.g. commandline) consumer?
>>> 
>>> Is the topic in question compacted?
>>> 
>>> 
>>> 
>>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>>> <ju...@protectwise.com> wrote:
>>>> Greetings all,
>>>> 
>>>> I’ve recently started hitting on the following error in Spark Streaming
>>>> in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
>>>> even to five minutes doesn’t seem to be helping. The problem only manifested
>>>> in the last few days, restarting with a new consumer group seems to remedy
>>>> the issue for a few hours (< retention, which is 12 hours).
>>>> 
>>>> Error:
>>>> Caused by: java.lang.AssertionError: assertion failed: Got wrong record
>>>> for spark-executor-<consumergrouphere> <topichere> 76 even after seeking to
>>>> offset 1759148155
>>>>    at scala.Predef$.assert(Predef.scala:170)
>>>>    at
>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>>>>    at
>>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>>>>    at
>>>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>>>>    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>>>> 
>>>> I guess my questions are, why is that assertion a job killer vs a
>>>> warning and is there anything I can tweak settings wise that may keep it at
>>>> bay.
>>>> 
>>>> I wouldn’t be surprised if this issue were exacerbated by the volume we
>>>> do on Kafka topics (~150k/sec on the persister that’s crashing).
>>>> 
>>>> Thank you!
>>>> Justin
>>>> 
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>> 


Re: "Got wrong record after seeking to offset" issue

Posted by Cody Koeninger <co...@koeninger.org>.
https://kafka.apache.org/documentation/#compaction

On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
<ju...@protectwise.com> wrote:
> By compacted do you mean compression? If so then we did recently turn on lz4
> compression. If there’s another meaning if there’s a command I can run to
> check compaction I’m happy to give that a shot too.
>
> I’ll try consuming from the failed offset if/when the problem manifests
> itself again.
>
> Thanks!
> Justin
>
>
> On Wednesday, January 17, 2018, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> That means the consumer on the executor tried to seek to the specified
>> offset, but the message that was returned did not have a matching
>> offset.  If the executor can't get the messages the driver told it to
>> get, something's generally wrong.
>>
>> What happens when you try to consume the particular failing offset
>> from another  (e.g. commandline) consumer?
>>
>> Is the topic in question compacted?
>>
>>
>>
>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>> <ju...@protectwise.com> wrote:
>> > Greetings all,
>> >
>> > I’ve recently started hitting on the following error in Spark Streaming
>> > in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
>> > even to five minutes doesn’t seem to be helping. The problem only manifested
>> > in the last few days, restarting with a new consumer group seems to remedy
>> > the issue for a few hours (< retention, which is 12 hours).
>> >
>> > Error:
>> > Caused by: java.lang.AssertionError: assertion failed: Got wrong record
>> > for spark-executor-<consumergrouphere> <topichere> 76 even after seeking to
>> > offset 1759148155
>> >     at scala.Predef$.assert(Predef.scala:170)
>> >     at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>> >     at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>> >     at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>> >     at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>> >
>> > I guess my questions are, why is that assertion a job killer vs a
>> > warning and is there anything I can tweak settings wise that may keep it at
>> > bay.
>> >
>> > I wouldn’t be surprised if this issue were exacerbated by the volume we
>> > do on Kafka topics (~150k/sec on the persister that’s crashing).
>> >
>> > Thank you!
>> > Justin
>> >
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: "Got wrong record after seeking to offset" issue

Posted by Justin Miller <ju...@protectwise.com>.
By compacted do you mean compression? If so then we did recently turn on
lz4 compression. If there’s another meaning if there’s a command I can run
to check compaction I’m happy to give that a shot too.

I’ll try consuming from the failed offset if/when the problem manifests
itself again.

Thanks!
Justin

On Wednesday, January 17, 2018, Cody Koeninger <co...@koeninger.org> wrote:

> That means the consumer on the executor tried to seek to the specified
> offset, but the message that was returned did not have a matching
> offset.  If the executor can't get the messages the driver told it to
> get, something's generally wrong.
>
> What happens when you try to consume the particular failing offset
> from another  (e.g. commandline) consumer?
>
> Is the topic in question compacted?
>
>
>
> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
> <ju...@protectwise.com> wrote:
> > Greetings all,
> >
> > I’ve recently started hitting on the following error in Spark Streaming
> in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
> even to five minutes doesn’t seem to be helping. The problem only
> manifested in the last few days, restarting with a new consumer group seems
> to remedy the issue for a few hours (< retention, which is 12 hours).
> >
> > Error:
> > Caused by: java.lang.AssertionError: assertion failed: Got wrong record
> for spark-executor-<consumergrouphere> <topichere> 76 even after seeking
> to offset 1759148155
> >     at scala.Predef$.assert(Predef.scala:170)
> >     at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:85)
> >     at org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:223)
> >     at org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:189)
> >     at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> >
> > I guess my questions are, why is that assertion a job killer vs a
> warning and is there anything I can tweak settings wise that may keep it at
> bay.
> >
> > I wouldn’t be surprised if this issue were exacerbated by the volume we
> do on Kafka topics (~150k/sec on the persister that’s crashing).
> >
> > Thank you!
> > Justin
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
>

Re: "Got wrong record after seeking to offset" issue

Posted by Cody Koeninger <co...@koeninger.org>.
That means the consumer on the executor tried to seek to the specified
offset, but the message that was returned did not have a matching
offset.  If the executor can't get the messages the driver told it to
get, something's generally wrong.

What happens when you try to consume the particular failing offset
from another  (e.g. commandline) consumer?

Is the topic in question compacted?



On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
<ju...@protectwise.com> wrote:
> Greetings all,
>
> I’ve recently started hitting on the following error in Spark Streaming in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms even to five minutes doesn’t seem to be helping. The problem only manifested in the last few days, restarting with a new consumer group seems to remedy the issue for a few hours (< retention, which is 12 hours).
>
> Error:
> Caused by: java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<consumergrouphere> <topichere> 76 even after seeking to offset 1759148155
>     at scala.Predef$.assert(Predef.scala:170)
>     at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>     at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>     at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>     at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>
> I guess my questions are, why is that assertion a job killer vs a warning and is there anything I can tweak settings wise that may keep it at bay.
>
> I wouldn’t be surprised if this issue were exacerbated by the volume we do on Kafka topics (~150k/sec on the persister that’s crashing).
>
> Thank you!
> Justin
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org