You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Simon Kitching (JIRA)" <ji...@apache.org> on 2017/07/06 09:14:00 UTC

[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

    [ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076214#comment-16076214 ] 

Simon Kitching commented on SPARK-19680:
----------------------------------------

I seem to be getting this problem too. Environment: spark 2.0.1, kafka 0.10.0, yarn.

However unlike the original reporter, when the OffsetOutOfRangeException occurs, the job continues running - but now from the head of the partitions rather than from "-earliest". Result is that we process the first few batches of records from each partition as expected (earliest) then spark springs almost silently over all records in the middle and starts processing from the head (latest) of the partition.

A sensible solution for this is really needed - this can happen to any application; changing the retention-time and batch-size will just reduce the probability of it happening.

Cody: thanks for your helpful feedback above. You recommended using "earliest plus some margin" - how can this be done when using DirectKafkaInputDStream?

> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to read all the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I get:
> Error:
> {code:title=error.log|borderStyle=solid}
> 	Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, Object] = {
>     Map(
>       "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: Broadcast[KafkaSink[TopList]]): Unit = {
>     getFilteredStream(stream.map(_.value()), windowDuration, slideDuration).foreachRDD(rdd => {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, slideDuration: Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP && s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams))
>     processSearchKeyWords(adEventStream, SparkUtil.getWindowDuration(properties), SparkUtil.getSlideDuration(properties), kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
>     logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
>     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed because the offset is not valid anymore due to retention policy the streaming job will stop working 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org