You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2016/10/19 17:22:15 UTC

Re: Spark 2.0 with Kafka 0.10 exception

Bringing this thread back as I'm seeing this exception on a production
kafka cluster.

I have two Spark streaming apps reading the same topic. App1 has batch
interval 2secs and app2 has 60secs.
Both apps are running on the same cluster on similar hardware. I see this
exception only in app2 and fairly consistently.

Difference I see between the apps is
App1
      spark.streaming.kafka.maxRatePerPartition, 6000
      batch interval 2 secs
App2
      spark.streaming.kafka.maxRatePerPartition, 10000
      batch interval 60 secs

All other kafka/spark related configs are same for both apps.
      spark.streaming.kafka.consumer.poll.ms = 4096
      spark.streaming.backpressure.enabled = true

Not sure if pre-fetching or caching is messing things up.

16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion
failed: Failed to get records for spark-executor-StreamingEventSplitProd
mt_event 6 49091480 after polling for 4096
        at scala.Predef$.assert(Predef.scala:170)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
get(CachedKafkaConsumer.scala:74)
        at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:227)
        at org.apache.spark.streaming.kafka010.KafkaRDD$
KafkaRDDIterator.next(KafkaRDD.scala:193)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)


On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org> wrote:

> That's not what I would have expected to happen with a lower cache
> setting, but in general disabling the cache isn't something you want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently?  From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com> wrote:
> > Setting those two results in below exception.
> > No.of executors < no.of partitions. Could that be triggering this?
> >
> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> (TID 9)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> > at java.util.HashMap.putVal(Unknown Source)
> > at java.util.HashMap.put(Unknown Source)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(
> KafkaRDD.scala:210)
> > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> >
> >
> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> you could try setting
> >>
> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >>
> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >>
> >> to 1
> >>
> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com> wrote:
> >> > I had a look at the executor logs and noticed that this exception
> >> > happens
> >> > only when using the cached consumer.
> >> > Every retry is successful. This is consistent.
> >> > One possibility is that the cached consumer is causing the failure as
> >> > retry
> >> > clears it.
> >> > Is there a way to disable cache and test this?
> >> > Again, kafkacat is running fine on the same node.
> >> >
> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
> >> > 7849)
> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
> >> > 7851
> >> >
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> >> > offsets 57079162 -> 57090330
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> >> > offsets 57098866 -> 57109957
> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> >> > 7851). 1030 bytes result sent to driver
> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
> >> > (TID
> >> > 7849)
> >> > java.lang.AssertionError: assertion failed: Failed to get records for
> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
> >> > for
> >> > 2048
> >> >       at scala.Predef$.assert(Predef.scala:170)
> >> >       at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >       at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> >       at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> >
> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task
> >> > 7854
> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
> >> > 7854)
> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
> >> > offsets 57098866 -> 57109957
> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >> >
> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
> >> > 7854). 1103 bytes result sent to driver
> >> >
> >> >
> >> >
> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <sr...@gmail.com>
> wrote:
> >> >>
> >> >> Thanks Cody. Setting poll timeout helped.
> >> >> Our network is fine but brokers are not fully provisioned in test
> >> >> cluster.
> >> >> But there isn't enough load to max out on broker capacity.
> >> >> Curious that kafkacat running on the same node doesn't have any
> issues.
> >> >>
> >> >> Srikanth
> >> >>
> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <co...@koeninger.org>
> >> >> wrote:
> >> >>>
> >> >>> You can set that poll timeout higher with
> >> >>>
> >> >>> spark.streaming.kafka.consumer.poll.ms
> >> >>>
> >> >>> but half a second is fairly generous.  I'd try to take a look at
> >> >>> what's going on with your network or kafka broker during that time.
> >> >>>
> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <sr...@gmail.com>
> >> >>> wrote:
> >> >>> > Hello,
> >> >>> >
> >> >>> > I'm getting the below exception when testing Spark 2.0 with Kafka
> >> >>> > 0.10.
> >> >>> >
> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
> >> >>> >> b8642491e78c5a13
> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> >> >>> >> spark-executor-example mt_event 0 15782114
> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
> coordinator
> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> >>> >> spark-executor-example.
> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage
> >> >>> >> 1.0
> >> >>> >> (TID
> >> >>> >> 6)
> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get records
> >> >>> >> for
> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for 512
> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
> >> >>> >> at
> >> >>> >>
> >> >>> >>
> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >>> >> at
> >> >>> >>
> >> >>> >>
> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >>> >> at
> >> >>> >>
> >> >>> >>
> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >> >>> >
> >> >>> >
> >> >>> > I get this error intermittently. Sometimes a few batches are
> >> >>> > scheduled
> >> >>> > and
> >> >>> > run fine. Then I get this error.
> >> >>> > kafkacat is able to fetch from this topic continuously.
> >> >>> >
> >> >>> > Full exception is here --
> >> >>> >
> >> >>> > https://gist.github.com/SrikanthTati/
> c2e95c4ac689cd49aab817e24ec42767
> >> >>> >
> >> >>> > Srikanth
> >> >>
> >> >>
> >> >
> >
> >
>

Re: Spark 2.0 with Kafka 0.10 exception

Posted by Cody Koeninger <co...@koeninger.org>.
That's a good point... the dstreams package is still on 10.0.1 though.
I'll make a ticket to update it.

On Fri, Oct 21, 2016 at 1:02 PM, Srikanth <sr...@gmail.com> wrote:
> Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
> & max.poll.interval.ms can be set differently.
> I'll leave it to you on how to add this to docs!
>
>
> On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> Right on, I put in a PR to make a note of that in the docs.
>>
>> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <sr...@gmail.com> wrote:
>> > Yeah, setting those params helped.
>> >
>> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >>
>> >> 60 seconds for a batch is above the default settings in kafka related
>> >> to heartbeat timeouts, so that might be related.  Have you tried
>> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
>> >> configs?
>> >>
>> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <sr...@gmail.com>
>> >> wrote:
>> >> > Bringing this thread back as I'm seeing this exception on a
>> >> > production
>> >> > kafka
>> >> > cluster.
>> >> >
>> >> > I have two Spark streaming apps reading the same topic. App1 has
>> >> > batch
>> >> > interval 2secs and app2 has 60secs.
>> >> > Both apps are running on the same cluster on similar hardware. I see
>> >> > this
>> >> > exception only in app2 and fairly consistently.
>> >> >
>> >> > Difference I see between the apps is
>> >> > App1
>> >> >       spark.streaming.kafka.maxRatePerPartition, 6000
>> >> >       batch interval 2 secs
>> >> > App2
>> >> >       spark.streaming.kafka.maxRatePerPartition, 10000
>> >> >       batch interval 60 secs
>> >> >
>> >> > All other kafka/spark related configs are same for both apps.
>> >> >       spark.streaming.kafka.consumer.poll.ms = 4096
>> >> >       spark.streaming.backpressure.enabled = true
>> >> >
>> >> > Not sure if pre-fetching or caching is messing things up.
>> >> >
>> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
>> >> > (TID
>> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
>> >> > assertion
>> >> > failed: Failed to get records for
>> >> > spark-executor-StreamingEventSplitProd
>> >> > mt_event 6 49091480 after polling for 4096
>> >> >         at scala.Predef$.assert(Predef.scala:170)
>> >> >         at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >         at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >         at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >         at
>> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >         at
>> >> > scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >         at
>> >> > scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>> >> >
>> >> >
>> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> That's not what I would have expected to happen with a lower cache
>> >> >> setting, but in general disabling the cache isn't something you want
>> >> >> to do with the new kafka consumer.
>> >> >>
>> >> >>
>> >> >> As far as the original issue, are you seeing those polling errors
>> >> >> intermittently, or consistently?  From your description, it sounds
>> >> >> like retry is working correctly.
>> >> >>
>> >> >>
>> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com>
>> >> >> wrote:
>> >> >> > Setting those two results in below exception.
>> >> >> > No.of executors < no.of partitions. Could that be triggering this?
>> >> >> >
>> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
>> >> >> > 2.0
>> >> >> > (TID 9)
>> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
>> >> >> > safe
>> >> >> > for
>> >> >> > multi-threaded access
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> >> >> > at java.util.HashMap.putVal(Unknown Source)
>> >> >> > at java.util.HashMap.put(Unknown Source)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> >> >> > at
>> >> >> >
>> >> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> >> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> >> >> > Source)
>> >> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> >> >> > Source)
>> >> >> > at java.lang.Thread.run(Unknown Source)
>> >> >> >
>> >> >> >
>> >> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger
>> >> >> > <co...@koeninger.org>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> you could try setting
>> >> >> >>
>> >> >> >> spark.streaming.kafka.consumer.cache.initialCapacity
>> >> >> >>
>> >> >> >> spark.streaming.kafka.consumer.cache.maxCapacity
>> >> >> >>
>> >> >> >> to 1
>> >> >> >>
>> >> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com>
>> >> >> >> wrote:
>> >> >> >> > I had a look at the executor logs and noticed that this
>> >> >> >> > exception
>> >> >> >> > happens
>> >> >> >> > only when using the cached consumer.
>> >> >> >> > Every retry is successful. This is consistent.
>> >> >> >> > One possibility is that the cached consumer is causing the
>> >> >> >> > failure
>> >> >> >> > as
>> >> >> >> > retry
>> >> >> >> > clears it.
>> >> >> >> > Is there a way to disable cache and test this?
>> >> >> >> > Again, kafkacat is running fine on the same node.
>> >> >> >> >
>> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7849)
>> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7851
>> >> >> >> >
>> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
>> >> >> >> > partition
>> >> >> >> > 2
>> >> >> >> > offsets 57079162 -> 57090330
>> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
>> >> >> >> > partition
>> >> >> >> > 0
>> >> >> >> > offsets 57098866 -> 57109957
>> >> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7851). 1030 bytes result sent to driver
>> >> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in
>> >> >> >> > stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7849)
>> >> >> >> > java.lang.AssertionError: assertion failed: Failed to get
>> >> >> >> > records
>> >> >> >> > for
>> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after
>> >> >> >> > polling
>> >> >> >> > for
>> >> >> >> > 2048
>> >> >> >> >       at scala.Predef$.assert(Predef.scala:170)
>> >> >> >> >       at
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >> >> >       at
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >> >> >       at
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >> >> >
>> >> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got
>> >> >> >> > assigned
>> >> >> >> > task
>> >> >> >> > 7854
>> >> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7854)
>> >> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event,
>> >> >> >> > partition
>> >> >> >> > 0
>> >> >> >> > offsets 57098866 -> 57109957
>> >> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
>> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
>> >> >> >> >
>> >> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage
>> >> >> >> > 138.0
>> >> >> >> > (TID
>> >> >> >> > 7854). 1103 bytes result sent to driver
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth
>> >> >> >> > <sr...@gmail.com>
>> >> >> >> > wrote:
>> >> >> >> >>
>> >> >> >> >> Thanks Cody. Setting poll timeout helped.
>> >> >> >> >> Our network is fine but brokers are not fully provisioned in
>> >> >> >> >> test
>> >> >> >> >> cluster.
>> >> >> >> >> But there isn't enough load to max out on broker capacity.
>> >> >> >> >> Curious that kafkacat running on the same node doesn't have
>> >> >> >> >> any
>> >> >> >> >> issues.
>> >> >> >> >>
>> >> >> >> >> Srikanth
>> >> >> >> >>
>> >> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger
>> >> >> >> >> <co...@koeninger.org>
>> >> >> >> >> wrote:
>> >> >> >> >>>
>> >> >> >> >>> You can set that poll timeout higher with
>> >> >> >> >>>
>> >> >> >> >>> spark.streaming.kafka.consumer.poll.ms
>> >> >> >> >>>
>> >> >> >> >>> but half a second is fairly generous.  I'd try to take a look
>> >> >> >> >>> at
>> >> >> >> >>> what's going on with your network or kafka broker during that
>> >> >> >> >>> time.
>> >> >> >> >>>
>> >> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth
>> >> >> >> >>> <sr...@gmail.com>
>> >> >> >> >>> wrote:
>> >> >> >> >>> > Hello,
>> >> >> >> >>> >
>> >> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with
>> >> >> >> >>> > Kafka
>> >> >> >> >>> > 0.10.
>> >> >> >> >>> >
>> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version :
>> >> >> >> >>> >> 0.10.0.0
>> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>> >> >> >> >>> >> b8642491e78c5a13
>> >> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch
>> >> >> >> >>> >> for
>> >> >> >> >>> >> spark-executor-example mt_event 0 15782114
>> >> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
>> >> >> >> >>> >> coordinator
>> >> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> >> >> >>> >> spark-executor-example.
>> >> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in
>> >> >> >> >>> >> stage
>> >> >> >> >>> >> 1.0
>> >> >> >> >>> >> (TID
>> >> >> >> >>> >> 6)
>> >> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
>> >> >> >> >>> >> records
>> >> >> >> >>> >> for
>> >> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling
>> >> >> >> >>> >> for
>> >> >> >> >>> >> 512
>> >> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> >> >> >>> >> at
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >> >> >>> >> at
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >> >> >>> >> at
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >>
>> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >> >> >>> >> at
>> >> >> >> >>> >>
>> >> >> >> >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >> >> >>> >
>> >> >> >> >>> >
>> >> >> >> >>> > I get this error intermittently. Sometimes a few batches
>> >> >> >> >>> > are
>> >> >> >> >>> > scheduled
>> >> >> >> >>> > and
>> >> >> >> >>> > run fine. Then I get this error.
>> >> >> >> >>> > kafkacat is able to fetch from this topic continuously.
>> >> >> >> >>> >
>> >> >> >> >>> > Full exception is here --
>> >> >> >> >>> >
>> >> >> >> >>> >
>> >> >> >> >>> >
>> >> >> >> >>> >
>> >> >> >> >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >> >> >> >>> >
>> >> >> >> >>> > Srikanth
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >
>> >> >> >
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark 2.0 with Kafka 0.10 exception

Posted by Srikanth <sr...@gmail.com>.
Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms
& max.poll.interval.ms can be set differently.
I'll leave it to you on how to add this to docs!


On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Right on, I put in a PR to make a note of that in the docs.
>
> On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <sr...@gmail.com> wrote:
> > Yeah, setting those params helped.
> >
> > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> 60 seconds for a batch is above the default settings in kafka related
> >> to heartbeat timeouts, so that might be related.  Have you tried
> >> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> >> configs?
> >>
> >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <sr...@gmail.com>
> wrote:
> >> > Bringing this thread back as I'm seeing this exception on a production
> >> > kafka
> >> > cluster.
> >> >
> >> > I have two Spark streaming apps reading the same topic. App1 has batch
> >> > interval 2secs and app2 has 60secs.
> >> > Both apps are running on the same cluster on similar hardware. I see
> >> > this
> >> > exception only in app2 and fairly consistently.
> >> >
> >> > Difference I see between the apps is
> >> > App1
> >> >       spark.streaming.kafka.maxRatePerPartition, 6000
> >> >       batch interval 2 secs
> >> > App2
> >> >       spark.streaming.kafka.maxRatePerPartition, 10000
> >> >       batch interval 60 secs
> >> >
> >> > All other kafka/spark related configs are same for both apps.
> >> >       spark.streaming.kafka.consumer.poll.ms = 4096
> >> >       spark.streaming.backpressure.enabled = true
> >> >
> >> > Not sure if pre-fetching or caching is messing things up.
> >> >
> >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
> >> > (TID
> >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> >> > assertion
> >> > failed: Failed to get records for spark-executor-
> StreamingEventSplitProd
> >> > mt_event 6 49091480 after polling for 4096
> >> >         at scala.Predef$.assert(Predef.scala:170)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> >         at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >         at scala.collection.Iterator$$anon$21.next(Iterator.scala:
> 838)
> >> >
> >> >
> >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> That's not what I would have expected to happen with a lower cache
> >> >> setting, but in general disabling the cache isn't something you want
> >> >> to do with the new kafka consumer.
> >> >>
> >> >>
> >> >> As far as the original issue, are you seeing those polling errors
> >> >> intermittently, or consistently?  From your description, it sounds
> >> >> like retry is working correctly.
> >> >>
> >> >>
> >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com>
> wrote:
> >> >> > Setting those two results in below exception.
> >> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >> >
> >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage
> 2.0
> >> >> > (TID 9)
> >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not
> safe
> >> >> > for
> >> >> > multi-threaded access
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> >> > at java.util.HashMap.putVal(Unknown Source)
> >> >> > at java.util.HashMap.put(Unknown Source)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.<init>(KafkaRDD.scala:210)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> >> >> > at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> >> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> >> >> > at
> >> >> > org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> >> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> >> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> >> >> > at java.lang.Thread.run(Unknown Source)
> >> >> >
> >> >> >
> >> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <cody@koeninger.org
> >
> >> >> > wrote:
> >> >> >>
> >> >> >> you could try setting
> >> >> >>
> >> >> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >> >> >>
> >> >> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >> >> >>
> >> >> >> to 1
> >> >> >>
> >> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com>
> >> >> >> wrote:
> >> >> >> > I had a look at the executor logs and noticed that this
> exception
> >> >> >> > happens
> >> >> >> > only when using the cached consumer.
> >> >> >> > Every retry is successful. This is consistent.
> >> >> >> > One possibility is that the cached consumer is causing the
> failure
> >> >> >> > as
> >> >> >> > retry
> >> >> >> > clears it.
> >> >> >> > Is there a way to disable cache and test this?
> >> >> >> > Again, kafkacat is running fine on the same node.
> >> >> >> >
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7849)
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7851
> >> >> >> >
> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 2
> >> >> >> > offsets 57079162 -> 57090330
> >> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 0
> >> >> >> > offsets 57098866 -> 57109957
> >> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage
> 138.0
> >> >> >> > (TID
> >> >> >> > 7851). 1030 bytes result sent to driver
> >> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
> >> >> >> > 138.0
> >> >> >> > (TID
> >> >> >> > 7849)
> >> >> >> > java.lang.AssertionError: assertion failed: Failed to get
> records
> >> >> >> > for
> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after
> >> >> >> > polling
> >> >> >> > for
> >> >> >> > 2048
> >> >> >> >       at scala.Predef$.assert(Predef.scala:170)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >> >       at
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >> >
> >> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got
> assigned
> >> >> >> > task
> >> >> >> > 7854
> >> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0
> >> >> >> > (TID
> >> >> >> > 7854)
> >> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event,
> >> >> >> > partition
> >> >> >> > 0
> >> >> >> > offsets 57098866 -> 57109957
> >> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> >> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >> >> >> >
> >> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage
> 138.0
> >> >> >> > (TID
> >> >> >> > 7854). 1103 bytes result sent to driver
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <
> srikanth.ht@gmail.com>
> >> >> >> > wrote:
> >> >> >> >>
> >> >> >> >> Thanks Cody. Setting poll timeout helped.
> >> >> >> >> Our network is fine but brokers are not fully provisioned in
> test
> >> >> >> >> cluster.
> >> >> >> >> But there isn't enough load to max out on broker capacity.
> >> >> >> >> Curious that kafkacat running on the same node doesn't have any
> >> >> >> >> issues.
> >> >> >> >>
> >> >> >> >> Srikanth
> >> >> >> >>
> >> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger
> >> >> >> >> <co...@koeninger.org>
> >> >> >> >> wrote:
> >> >> >> >>>
> >> >> >> >>> You can set that poll timeout higher with
> >> >> >> >>>
> >> >> >> >>> spark.streaming.kafka.consumer.poll.ms
> >> >> >> >>>
> >> >> >> >>> but half a second is fairly generous.  I'd try to take a look
> at
> >> >> >> >>> what's going on with your network or kafka broker during that
> >> >> >> >>> time.
> >> >> >> >>>
> >> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth
> >> >> >> >>> <sr...@gmail.com>
> >> >> >> >>> wrote:
> >> >> >> >>> > Hello,
> >> >> >> >>> >
> >> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with
> >> >> >> >>> > Kafka
> >> >> >> >>> > 0.10.
> >> >> >> >>> >
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version :
> >> >> >> >>> >> 0.10.0.0
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
> >> >> >> >>> >> b8642491e78c5a13
> >> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch
> for
> >> >> >> >>> >> spark-executor-example mt_event 0 15782114
> >> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
> >> >> >> >>> >> coordinator
> >> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> >> >> >>> >> spark-executor-example.
> >> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in
> >> >> >> >>> >> stage
> >> >> >> >>> >> 1.0
> >> >> >> >>> >> (TID
> >> >> >> >>> >> 6)
> >> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
> >> >> >> >>> >> records
> >> >> >> >>> >> for
> >> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling
> for
> >> >> >> >>> >> 512
> >> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >> >>> >> at
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >> >>> >> at
> >> >> >> >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > I get this error intermittently. Sometimes a few batches are
> >> >> >> >>> > scheduled
> >> >> >> >>> > and
> >> >> >> >>> > run fine. Then I get this error.
> >> >> >> >>> > kafkacat is able to fetch from this topic continuously.
> >> >> >> >>> >
> >> >> >> >>> > Full exception is here --
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>> > https://gist.github.com/SrikanthTati/
> c2e95c4ac689cd49aab817e24ec42767
> >> >> >> >>> >
> >> >> >> >>> > Srikanth
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Re: Spark 2.0 with Kafka 0.10 exception

Posted by Cody Koeninger <co...@koeninger.org>.
Right on, I put in a PR to make a note of that in the docs.

On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <sr...@gmail.com> wrote:
> Yeah, setting those params helped.
>
> On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> 60 seconds for a batch is above the default settings in kafka related
>> to heartbeat timeouts, so that might be related.  Have you tried
>> tweaking session.timeout.ms, heartbeat.interval.ms, or related
>> configs?
>>
>> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <sr...@gmail.com> wrote:
>> > Bringing this thread back as I'm seeing this exception on a production
>> > kafka
>> > cluster.
>> >
>> > I have two Spark streaming apps reading the same topic. App1 has batch
>> > interval 2secs and app2 has 60secs.
>> > Both apps are running on the same cluster on similar hardware. I see
>> > this
>> > exception only in app2 and fairly consistently.
>> >
>> > Difference I see between the apps is
>> > App1
>> >       spark.streaming.kafka.maxRatePerPartition, 6000
>> >       batch interval 2 secs
>> > App2
>> >       spark.streaming.kafka.maxRatePerPartition, 10000
>> >       batch interval 60 secs
>> >
>> > All other kafka/spark related configs are same for both apps.
>> >       spark.streaming.kafka.consumer.poll.ms = 4096
>> >       spark.streaming.backpressure.enabled = true
>> >
>> > Not sure if pre-fetching or caching is messing things up.
>> >
>> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
>> > (TID
>> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
>> > assertion
>> > failed: Failed to get records for spark-executor-StreamingEventSplitProd
>> > mt_event 6 49091480 after polling for 4096
>> >         at scala.Predef$.assert(Predef.scala:170)
>> >         at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >         at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >         at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >         at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >>
>> >> That's not what I would have expected to happen with a lower cache
>> >> setting, but in general disabling the cache isn't something you want
>> >> to do with the new kafka consumer.
>> >>
>> >>
>> >> As far as the original issue, are you seeing those polling errors
>> >> intermittently, or consistently?  From your description, it sounds
>> >> like retry is working correctly.
>> >>
>> >>
>> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com> wrote:
>> >> > Setting those two results in below exception.
>> >> > No.of executors < no.of partitions. Could that be triggering this?
>> >> >
>> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> >> > (TID 9)
>> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
>> >> > for
>> >> > multi-threaded access
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> >> > at java.util.HashMap.putVal(Unknown Source)
>> >> > at java.util.HashMap.put(Unknown Source)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210)
>> >> > at
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> >> > at
>> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> >> > at java.lang.Thread.run(Unknown Source)
>> >> >
>> >> >
>> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <co...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> you could try setting
>> >> >>
>> >> >> spark.streaming.kafka.consumer.cache.initialCapacity
>> >> >>
>> >> >> spark.streaming.kafka.consumer.cache.maxCapacity
>> >> >>
>> >> >> to 1
>> >> >>
>> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com>
>> >> >> wrote:
>> >> >> > I had a look at the executor logs and noticed that this exception
>> >> >> > happens
>> >> >> > only when using the cached consumer.
>> >> >> > Every retry is successful. This is consistent.
>> >> >> > One possibility is that the cached consumer is causing the failure
>> >> >> > as
>> >> >> > retry
>> >> >> > clears it.
>> >> >> > Is there a way to disable cache and test this?
>> >> >> > Again, kafkacat is running fine on the same node.
>> >> >> >
>> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0
>> >> >> > (TID
>> >> >> > 7849)
>> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0
>> >> >> > (TID
>> >> >> > 7851
>> >> >> >
>> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
>> >> >> > partition
>> >> >> > 2
>> >> >> > offsets 57079162 -> 57090330
>> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
>> >> >> > partition
>> >> >> > 0
>> >> >> > offsets 57098866 -> 57109957
>> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0
>> >> >> > (TID
>> >> >> > 7851). 1030 bytes result sent to driver
>> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
>> >> >> > 138.0
>> >> >> > (TID
>> >> >> > 7849)
>> >> >> > java.lang.AssertionError: assertion failed: Failed to get records
>> >> >> > for
>> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after
>> >> >> > polling
>> >> >> > for
>> >> >> > 2048
>> >> >> >       at scala.Predef$.assert(Predef.scala:170)
>> >> >> >       at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >> >       at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >> >       at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >> >
>> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned
>> >> >> > task
>> >> >> > 7854
>> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0
>> >> >> > (TID
>> >> >> > 7854)
>> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event,
>> >> >> > partition
>> >> >> > 0
>> >> >> > offsets 57098866 -> 57109957
>> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
>> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
>> >> >> >
>> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0
>> >> >> > (TID
>> >> >> > 7854). 1103 bytes result sent to driver
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <sr...@gmail.com>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thanks Cody. Setting poll timeout helped.
>> >> >> >> Our network is fine but brokers are not fully provisioned in test
>> >> >> >> cluster.
>> >> >> >> But there isn't enough load to max out on broker capacity.
>> >> >> >> Curious that kafkacat running on the same node doesn't have any
>> >> >> >> issues.
>> >> >> >>
>> >> >> >> Srikanth
>> >> >> >>
>> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger
>> >> >> >> <co...@koeninger.org>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> You can set that poll timeout higher with
>> >> >> >>>
>> >> >> >>> spark.streaming.kafka.consumer.poll.ms
>> >> >> >>>
>> >> >> >>> but half a second is fairly generous.  I'd try to take a look at
>> >> >> >>> what's going on with your network or kafka broker during that
>> >> >> >>> time.
>> >> >> >>>
>> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth
>> >> >> >>> <sr...@gmail.com>
>> >> >> >>> wrote:
>> >> >> >>> > Hello,
>> >> >> >>> >
>> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with
>> >> >> >>> > Kafka
>> >> >> >>> > 0.10.
>> >> >> >>> >
>> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version :
>> >> >> >>> >> 0.10.0.0
>> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>> >> >> >>> >> b8642491e78c5a13
>> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> >> >> >>> >> spark-executor-example mt_event 0 15782114
>> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
>> >> >> >>> >> coordinator
>> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> >> >>> >> spark-executor-example.
>> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in
>> >> >> >>> >> stage
>> >> >> >>> >> 1.0
>> >> >> >>> >> (TID
>> >> >> >>> >> 6)
>> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
>> >> >> >>> >> records
>> >> >> >>> >> for
>> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for
>> >> >> >>> >> 512
>> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> >> >>> >> at
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >> >>> >> at
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >> >>> >> at
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >> >>> >> at
>> >> >> >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > I get this error intermittently. Sometimes a few batches are
>> >> >> >>> > scheduled
>> >> >> >>> > and
>> >> >> >>> > run fine. Then I get this error.
>> >> >> >>> > kafkacat is able to fetch from this topic continuously.
>> >> >> >>> >
>> >> >> >>> > Full exception is here --
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >> >> >>> >
>> >> >> >>> > Srikanth
>> >> >> >>
>> >> >> >>
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark 2.0 with Kafka 0.10 exception

Posted by Srikanth <sr...@gmail.com>.
Yeah, setting those params helped.

On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <co...@koeninger.org> wrote:

> 60 seconds for a batch is above the default settings in kafka related
> to heartbeat timeouts, so that might be related.  Have you tried
> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> configs?
>
> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <sr...@gmail.com> wrote:
> > Bringing this thread back as I'm seeing this exception on a production
> kafka
> > cluster.
> >
> > I have two Spark streaming apps reading the same topic. App1 has batch
> > interval 2secs and app2 has 60secs.
> > Both apps are running on the same cluster on similar hardware. I see this
> > exception only in app2 and fairly consistently.
> >
> > Difference I see between the apps is
> > App1
> >       spark.streaming.kafka.maxRatePerPartition, 6000
> >       batch interval 2 secs
> > App2
> >       spark.streaming.kafka.maxRatePerPartition, 10000
> >       batch interval 60 secs
> >
> > All other kafka/spark related configs are same for both apps.
> >       spark.streaming.kafka.consumer.poll.ms = 4096
> >       spark.streaming.backpressure.enabled = true
> >
> > Not sure if pre-fetching or caching is messing things up.
> >
> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> assertion
> > failed: Failed to get records for spark-executor-StreamingEventSplitProd
> > mt_event 6 49091480 after polling for 4096
> >         at scala.Predef$.assert(Predef.scala:170)
> >         at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >         at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >         at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >         at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
> >
> >
> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> That's not what I would have expected to happen with a lower cache
> >> setting, but in general disabling the cache isn't something you want
> >> to do with the new kafka consumer.
> >>
> >>
> >> As far as the original issue, are you seeing those polling errors
> >> intermittently, or consistently?  From your description, it sounds
> >> like retry is working correctly.
> >>
> >>
> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com> wrote:
> >> > Setting those two results in below exception.
> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >
> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> >> > (TID 9)
> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
> for
> >> > multi-threaded access
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> > at java.util.HashMap.putVal(Unknown Source)
> >> > at java.util.HashMap.put(Unknown Source)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(
> KafkaRDD.scala:210)
> >> > at
> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> >
> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> >> > at
> >> >
> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> >> > at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> >> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >> > at java.lang.Thread.run(Unknown Source)
> >> >
> >> >
> >> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <co...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> you could try setting
> >> >>
> >> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >> >>
> >> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >> >>
> >> >> to 1
> >> >>
> >> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com>
> wrote:
> >> >> > I had a look at the executor logs and noticed that this exception
> >> >> > happens
> >> >> > only when using the cached consumer.
> >> >> > Every retry is successful. This is consistent.
> >> >> > One possibility is that the cached consumer is causing the failure
> as
> >> >> > retry
> >> >> > clears it.
> >> >> > Is there a way to disable cache and test this?
> >> >> > Again, kafkacat is running fine on the same node.
> >> >> >
> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0
> (TID
> >> >> > 7849)
> >> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0
> (TID
> >> >> > 7851
> >> >> >
> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> partition
> >> >> > 2
> >> >> > offsets 57079162 -> 57090330
> >> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event,
> partition
> >> >> > 0
> >> >> > offsets 57098866 -> 57109957
> >> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0
> >> >> > (TID
> >> >> > 7851). 1030 bytes result sent to driver
> >> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
> >> >> > 138.0
> >> >> > (TID
> >> >> > 7849)
> >> >> > java.lang.AssertionError: assertion failed: Failed to get records
> for
> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after
> polling
> >> >> > for
> >> >> > 2048
> >> >> >       at scala.Predef$.assert(Predef.scala:170)
> >> >> >       at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >       at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >       at
> >> >> >
> >> >> >
> >> >> > org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >
> >> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned
> >> >> > task
> >> >> > 7854
> >> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0
> (TID
> >> >> > 7854)
> >> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event,
> partition
> >> >> > 0
> >> >> > offsets 57098866 -> 57109957
> >> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> >> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >> >> >
> >> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0
> >> >> > (TID
> >> >> > 7854). 1103 bytes result sent to driver
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <sr...@gmail.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Thanks Cody. Setting poll timeout helped.
> >> >> >> Our network is fine but brokers are not fully provisioned in test
> >> >> >> cluster.
> >> >> >> But there isn't enough load to max out on broker capacity.
> >> >> >> Curious that kafkacat running on the same node doesn't have any
> >> >> >> issues.
> >> >> >>
> >> >> >> Srikanth
> >> >> >>
> >> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <
> cody@koeninger.org>
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> You can set that poll timeout higher with
> >> >> >>>
> >> >> >>> spark.streaming.kafka.consumer.poll.ms
> >> >> >>>
> >> >> >>> but half a second is fairly generous.  I'd try to take a look at
> >> >> >>> what's going on with your network or kafka broker during that
> time.
> >> >> >>>
> >> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth.ht@gmail.com
> >
> >> >> >>> wrote:
> >> >> >>> > Hello,
> >> >> >>> >
> >> >> >>> > I'm getting the below exception when testing Spark 2.0 with
> Kafka
> >> >> >>> > 0.10.
> >> >> >>> >
> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> >> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
> >> >> >>> >> b8642491e78c5a13
> >> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> >> >> >>> >> spark-executor-example mt_event 0 15782114
> >> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
> >> >> >>> >> coordinator
> >> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> >> >> >>> >> spark-executor-example.
> >> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in
> stage
> >> >> >>> >> 1.0
> >> >> >>> >> (TID
> >> >> >>> >> 6)
> >> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
> >> >> >>> >> records
> >> >> >>> >> for
> >> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for
> 512
> >> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
> >> >> >>> >> at
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >> >>> >> at
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:227)
> >> >> >>> >> at
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >>
> >> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$
> KafkaRDDIterator.next(KafkaRDD.scala:193)
> >> >> >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:
> 409)
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > I get this error intermittently. Sometimes a few batches are
> >> >> >>> > scheduled
> >> >> >>> > and
> >> >> >>> > run fine. Then I get this error.
> >> >> >>> > kafkacat is able to fetch from this topic continuously.
> >> >> >>> >
> >> >> >>> > Full exception is here --
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > https://gist.github.com/SrikanthTati/
> c2e95c4ac689cd49aab817e24ec42767
> >> >> >>> >
> >> >> >>> > Srikanth
> >> >> >>
> >> >> >>
> >> >> >
> >> >
> >> >
> >
> >
>

Re: Spark 2.0 with Kafka 0.10 exception

Posted by Cody Koeninger <co...@koeninger.org>.
60 seconds for a batch is above the default settings in kafka related
to heartbeat timeouts, so that might be related.  Have you tried
tweaking session.timeout.ms, heartbeat.interval.ms, or related
configs?

On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <sr...@gmail.com> wrote:
> Bringing this thread back as I'm seeing this exception on a production kafka
> cluster.
>
> I have two Spark streaming apps reading the same topic. App1 has batch
> interval 2secs and app2 has 60secs.
> Both apps are running on the same cluster on similar hardware. I see this
> exception only in app2 and fairly consistently.
>
> Difference I see between the apps is
> App1
>       spark.streaming.kafka.maxRatePerPartition, 6000
>       batch interval 2 secs
> App2
>       spark.streaming.kafka.maxRatePerPartition, 10000
>       batch interval 60 secs
>
> All other kafka/spark related configs are same for both apps.
>       spark.streaming.kafka.consumer.poll.ms = 4096
>       spark.streaming.backpressure.enabled = true
>
> Not sure if pre-fetching or caching is messing things up.
>
> 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
> 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion
> failed: Failed to get records for spark-executor-StreamingEventSplitProd
> mt_event 6 49091480 after polling for 4096
>         at scala.Predef$.assert(Predef.scala:170)
>         at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>         at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>         at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>         at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>
>
> On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> That's not what I would have expected to happen with a lower cache
>> setting, but in general disabling the cache isn't something you want
>> to do with the new kafka consumer.
>>
>>
>> As far as the original issue, are you seeing those polling errors
>> intermittently, or consistently?  From your description, it sounds
>> like retry is working correctly.
>>
>>
>> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <sr...@gmail.com> wrote:
>> > Setting those two results in below exception.
>> > No.of executors < no.of partitions. Could that be triggering this?
>> >
>> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> > (TID 9)
>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> > multi-threaded access
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> > at java.util.HashMap.putVal(Unknown Source)
>> > at java.util.HashMap.put(Unknown Source)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:210)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> > at java.lang.Thread.run(Unknown Source)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <co...@koeninger.org>
>> > wrote:
>> >>
>> >> you could try setting
>> >>
>> >> spark.streaming.kafka.consumer.cache.initialCapacity
>> >>
>> >> spark.streaming.kafka.consumer.cache.maxCapacity
>> >>
>> >> to 1
>> >>
>> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <sr...@gmail.com> wrote:
>> >> > I had a look at the executor logs and noticed that this exception
>> >> > happens
>> >> > only when using the cached consumer.
>> >> > Every retry is successful. This is consistent.
>> >> > One possibility is that the cached consumer is causing the failure as
>> >> > retry
>> >> > clears it.
>> >> > Is there a way to disable cache and test this?
>> >> > Again, kafkacat is running fine on the same node.
>> >> >
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
>> >> > 7849)
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
>> >> > 7851
>> >> >
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 2
>> >> > offsets 57079162 -> 57090330
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 0
>> >> > offsets 57098866 -> 57109957
>> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0
>> >> > (TID
>> >> > 7851). 1030 bytes result sent to driver
>> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
>> >> > 138.0
>> >> > (TID
>> >> > 7849)
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
>> >> > for
>> >> > 2048
>> >> >       at scala.Predef$.assert(Predef.scala:170)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >       at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >
>> >> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned
>> >> > task
>> >> > 7854
>> >> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
>> >> > 7854)
>> >> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 0
>> >> > offsets 57098866 -> 57109957
>> >> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
>> >> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
>> >> >
>> >> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0
>> >> > (TID
>> >> > 7854). 1103 bytes result sent to driver
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <sr...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Thanks Cody. Setting poll timeout helped.
>> >> >> Our network is fine but brokers are not fully provisioned in test
>> >> >> cluster.
>> >> >> But there isn't enough load to max out on broker capacity.
>> >> >> Curious that kafkacat running on the same node doesn't have any
>> >> >> issues.
>> >> >>
>> >> >> Srikanth
>> >> >>
>> >> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <co...@koeninger.org>
>> >> >> wrote:
>> >> >>>
>> >> >>> You can set that poll timeout higher with
>> >> >>>
>> >> >>> spark.streaming.kafka.consumer.poll.ms
>> >> >>>
>> >> >>> but half a second is fairly generous.  I'd try to take a look at
>> >> >>> what's going on with your network or kafka broker during that time.
>> >> >>>
>> >> >>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <sr...@gmail.com>
>> >> >>> wrote:
>> >> >>> > Hello,
>> >> >>> >
>> >> >>> > I'm getting the below exception when testing Spark 2.0 with Kafka
>> >> >>> > 0.10.
>> >> >>> >
>> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>> >> >>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>> >> >>> >> b8642491e78c5a13
>> >> >>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> >> >>> >> spark-executor-example mt_event 0 15782114
>> >> >>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered
>> >> >>> >> coordinator
>> >> >>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> >>> >> spark-executor-example.
>> >> >>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage
>> >> >>> >> 1.0
>> >> >>> >> (TID
>> >> >>> >> 6)
>> >> >>> >> java.lang.AssertionError: assertion failed: Failed to get
>> >> >>> >> records
>> >> >>> >> for
>> >> >>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>> >> >>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >> >>> >> at
>> >> >>> >>
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >> >>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >> >>> >
>> >> >>> >
>> >> >>> > I get this error intermittently. Sometimes a few batches are
>> >> >>> > scheduled
>> >> >>> > and
>> >> >>> > run fine. Then I get this error.
>> >> >>> > kafkacat is able to fetch from this topic continuously.
>> >> >>> >
>> >> >>> > Full exception is here --
>> >> >>> >
>> >> >>> >
>> >> >>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >> >>> >
>> >> >>> > Srikanth
>> >> >>
>> >> >>
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org