You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Biplob Biswas (JIRA)" <ji...@apache.org> on 2018/07/11 11:14:00 UTC
[jira] [Commented] (SPARK-19680) Offsets out of range with no
configured reset policy for partitions
[ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539906#comment-16539906 ]
Biplob Biswas commented on SPARK-19680:
---------------------------------------
This essentially poisons the Consumer Group, and the only way to fix reading messages again was to use a new consumer group which start from the latest offset (Also, as spark-executor * groupid's are recreated for the new consumer group). Is there a way to salvage the consumer group without removing them from Kafka?
Also, the executor assigned consumer group is not available on kafka, where exactly is it stored?
> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.0
> Reporter: Schakmann Rene
> Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to read all the messages in kafka. So I set
> "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I get:
> Error:
> {code:title=error.log|borderStyle=solid}
> Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
> def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, Object] = {
> Map(
> "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
> "group.id" -> properties.getProperty("kafka.consumer.group"),
> "auto.offset.reset" -> "earliest",
> "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> "enable.auto.commit" -> "false",
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
> }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
> def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, slideDuration).foreachRDD(rdd => {
> val topList = new TopList
> topList.setCreated(new Date())
> topList.setTopListEntryList(rdd.take(TopListLength).toList)
> CurrentLogger.info("TopList length: " + topList.getTopListEntryList.size().toString)
> kafkaSink.value.send(SendToTopicName, topList)
> CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
> }
> def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
> .filter(s => s != null && s.getSearchRequest != null && s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP && s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
> .map(row => {
> val name = row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
> })
> .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
> .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
> .map(row => (row._2.getSearchCount, row._2))
> .transform(rdd => rdd.sortByKey(ascending = false))
> .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, row._2.getMeanSearchHits / row._2.getSearchCount))
> }
> def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink = sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val kafkaParams: Map[String, Object] = SparkUtil.getDefaultKafkaReceiverParameter(properties)
> val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
> ssc.checkpoint("/home/spark/checkpoints")
> val adEventStream =
> KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams))
> processSearchKeyWords(adEventStream, SparkUtil.getWindowDuration(properties), SparkUtil.getSlideDuration(properties), kafkaSink)
> ssc.start()
> ssc.awaitTermination()
> }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
> logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
> kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed because the offset is not valid anymore due to retention policy the streaming job will stop working
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org