You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tao Tian (JIRA)" <ji...@apache.org> on 2017/08/28 13:21:00 UTC

[jira] [Commented] (SPARK-19547) KafkaUtil throw 'No current assignment for partition' Exception

    [ https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143760#comment-16143760 ] 

Tao Tian commented on SPARK-19547:
----------------------------------

I have encounter this problem too. using Kafka 0.10.0 and spark-streaming_2.11. The error log is "Caused by: org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {db4.CLAC.result-0=61638808} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).
"  
I set  "max.partition.fetch.bytes = 10485720" for Kafka has fixed the problem.

> KafkaUtil throw 'No current assignment for partition' Exception
> ---------------------------------------------------------------
>
>                 Key: SPARK-19547
>                 URL: https://issues.apache.org/jira/browse/SPARK-19547
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 1.6.1
>            Reporter: wuchang
>
> Below is my scala code to create spark kafka stream:
> val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> "server110:2181,server110:9092",
>       "zookeeper" -> "server110:2181",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> "example",
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>     val topics = Array("ABTest")
>     val stream = KafkaUtils.createDirectStream[String, String](
>       ssc,
>       PreferConsistent,
>       Subscribe[String, String](topics, kafkaParams)
>     )
> But after run for 10 hours, it throws exceptions:
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.ConsumerCoordinator: Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example
> 2017-02-10 10:56:20,011 INFO  [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example
> 2017-02-10 10:56:40,057 INFO  [JobGenerator] internals.AbstractCoordinator: Successfully joined group example with generation 5
> 2017-02-10 10:56:40,058 INFO  [JobGenerator] internals.ConsumerCoordinator: Setting newly assigned partitions [ABTest-1] for group example
> 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error generating jobs for time 1486695380000 ms
> java.lang.IllegalStateException: No current assignment for partition ABTest-0
>         at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>         at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
>         at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>         at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>         at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>         at scala.Option.orElse(Option.scala:289)
>         at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>         at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>         at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
>         at scala.util.Try$.apply(Try.scala:192)
>         at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>         at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Obviously , The partition ABTestMsg-0 has already be revoked for this consumer, but it seems that the spark streaming consumer are not aware of that  and continue to consume data of this revoked topic-partition , so the exception occurs and the total spark job aborted.
> I think the kafka rebalance event is very normal , how can I modify my code to make Spark streaming deal with the  partition-revoke event correctly?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org