You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Roman Garcia <ro...@gmail.com> on 2015/10/08 21:51:44 UTC

Streaming DirectKafka assertion errors

I'm running Spark Streaming using Kafka Direct stream, expecting
exactly-once semantics using checkpoints (which are stored onto HDFS).
My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
each) and stores every row to ElasticSearch using ES-Spark integration.

This job was working without issues on a different environment, but on this
new environment, I've started to see these assertion errors:

"Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
java.lang.AssertionError: assertion failed: Beginning offset 20 is after
the ending offset 14 for topic some-topic partition 1. You either provided
an invalid fromOffset, or the Kafka topic has been damaged"

and also

"Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
messages before reaching ending offset 20 for topic some-topic partition 1
start 14. This should not happen, and indicates that messages may have been
lost"

When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic, 3
zookeeper nodes, no other consumers), I see what appaears to differ from
Spark offset info:

*running kafka.tools.GetOffsetShell --time -1*
some-topic:0:20
some-topic:1:20
some-topic:2:19
some-topic:3:20
*running kafka.tools.GetOffsetShell --time -2*
some-topic:0:0
some-topic:1:0
some-topic:2:0
some-topic:3:0

*running kafka-simple-consumer-shell* I can see all stored messages until
offset 20, with a final output: "Terminating. Reached the end of partition
(some-topic, 1) at offset 20"

I tried removing the whole checkpoint dir and start over, but it keeps
failing.

It looks like these tasks get retried without end. On the spark-ui
streaming tab I see the "Active batches" increase with a confusing "Input
size" value of "-19" (negative size?)

Any pointers will help
Thanks

Roman

Re: Streaming DirectKafka assertion errors

Posted by Roman Garcia <ro...@gmail.com>.
Thanks Cody for your help. Actually i found out it was a issue on our
network. After doing a ping from spark node to kafka node i found there
were dup packages. After rebooting the kafka node everything went back to
normal!
Thanks for your help!
Roman

El jue., 8 de octubre de 2015 17:13, Cody Koeninger <co...@koeninger.org>
escribió:

> It sounds like you moved the job from one environment to another?
>
> This may sound silly, but make sure (eg using lsof) the brokers the job is
> connecting to are actually the ones you expect.
>
> As far as the checkpoint goes, the log output should indicate whether the
> job is restoring from checkpoint.  Make sure that output no longer shows up
> after you stopped the job, deleted the checkpoint directory, and restarted
> it.
>
>
>
> On Thu, Oct 8, 2015 at 2:51 PM, Roman Garcia <ro...@gmail.com>
> wrote:
>
>> I'm running Spark Streaming using Kafka Direct stream, expecting
>> exactly-once semantics using checkpoints (which are stored onto HDFS).
>> My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
>> each) and stores every row to ElasticSearch using ES-Spark integration.
>>
>> This job was working without issues on a different environment, but on
>> this new environment, I've started to see these assertion errors:
>>
>> "Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
>> most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
>> java.lang.AssertionError: assertion failed: Beginning offset 20 is after
>> the ending offset 14 for topic some-topic partition 1. You either provided
>> an invalid fromOffset, or the Kafka topic has been damaged"
>>
>> and also
>>
>> "Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
>> most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
>> 192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
>> messages before reaching ending offset 20 for topic some-topic partition 1
>> start 14. This should not happen, and indicates that messages may have been
>> lost"
>>
>> When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic,
>> 3 zookeeper nodes, no other consumers), I see what appaears to differ from
>> Spark offset info:
>>
>> *running kafka.tools.GetOffsetShell --time -1*
>> some-topic:0:20
>> some-topic:1:20
>> some-topic:2:19
>> some-topic:3:20
>> *running kafka.tools.GetOffsetShell --time -2*
>> some-topic:0:0
>> some-topic:1:0
>> some-topic:2:0
>> some-topic:3:0
>>
>> *running kafka-simple-consumer-shell* I can see all stored messages
>> until offset 20, with a final output: "Terminating. Reached the end of
>> partition (some-topic, 1) at offset 20"
>>
>> I tried removing the whole checkpoint dir and start over, but it keeps
>> failing.
>>
>> It looks like these tasks get retried without end. On the spark-ui
>> streaming tab I see the "Active batches" increase with a confusing "Input
>> size" value of "-19" (negative size?)
>>
>> Any pointers will help
>> Thanks
>>
>> Roman
>>
>>
>

Re: Streaming DirectKafka assertion errors

Posted by Cody Koeninger <co...@koeninger.org>.
It sounds like you moved the job from one environment to another?

This may sound silly, but make sure (eg using lsof) the brokers the job is
connecting to are actually the ones you expect.

As far as the checkpoint goes, the log output should indicate whether the
job is restoring from checkpoint.  Make sure that output no longer shows up
after you stopped the job, deleted the checkpoint directory, and restarted
it.



On Thu, Oct 8, 2015 at 2:51 PM, Roman Garcia <ro...@gmail.com> wrote:

> I'm running Spark Streaming using Kafka Direct stream, expecting
> exactly-once semantics using checkpoints (which are stored onto HDFS).
> My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
> each) and stores every row to ElasticSearch using ES-Spark integration.
>
> This job was working without issues on a different environment, but on
> this new environment, I've started to see these assertion errors:
>
> "Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
> most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
> java.lang.AssertionError: assertion failed: Beginning offset 20 is after
> the ending offset 14 for topic some-topic partition 1. You either provided
> an invalid fromOffset, or the Kafka topic has been damaged"
>
> and also
>
> "Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
> most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
> 192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
> messages before reaching ending offset 20 for topic some-topic partition 1
> start 14. This should not happen, and indicates that messages may have been
> lost"
>
> When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic, 3
> zookeeper nodes, no other consumers), I see what appaears to differ from
> Spark offset info:
>
> *running kafka.tools.GetOffsetShell --time -1*
> some-topic:0:20
> some-topic:1:20
> some-topic:2:19
> some-topic:3:20
> *running kafka.tools.GetOffsetShell --time -2*
> some-topic:0:0
> some-topic:1:0
> some-topic:2:0
> some-topic:3:0
>
> *running kafka-simple-consumer-shell* I can see all stored messages until
> offset 20, with a final output: "Terminating. Reached the end of partition
> (some-topic, 1) at offset 20"
>
> I tried removing the whole checkpoint dir and start over, but it keeps
> failing.
>
> It looks like these tasks get retried without end. On the spark-ui
> streaming tab I see the "Active batches" increase with a confusing "Input
> size" value of "-19" (negative size?)
>
> Any pointers will help
> Thanks
>
> Roman
>
>