You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Karan Singh (JIRA)" <ji...@apache.org> on 2017/09/15 13:03:02 UTC
[jira] [Comment Edited] (SPARK-19275) Spark Streaming, Kafka
receiver, "Failed to get records for ... after polling for 512"
[ https://issues.apache.org/jira/browse/SPARK-19275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166112#comment-16166112 ]
Karan Singh edited comment on SPARK-19275 at 9/15/17 1:02 PM:
--------------------------------------------------------------
Hi Team ,
My Spark Streaming duration is 5 seconds (5000) and kafka is all at its default properties , i am still facing this issue , Can anyone tell me how to resolve it what i am doing wrong ?
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));
Exception in Spark Streamings
Job aborted due to stage failure: Task 6 in stage 289.0 failed 4 times, most recent failure: Lost task 6.3 in stage 289.0 (xxxxxx, executor 2): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-xxx pulse 1 163684030 after polling for 512
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at com.xxx.analytics.engine.EngineManager$1$1.call(EngineManager.java:165)
at com.xxx.analytics.engine.EngineManager$1$1.call(EngineManager.java:132)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
was (Author: karansinghkjs346):
Hi Team ,
My Spark Streaming duration is 5 seconds (5000) and kafka is all at its default properties , i am still facing this issue , Can anyone tell me how to resolve it what i am doing wrong ?
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));
Exception in Spark Streamings
Job aborted due to stage failure: Task 6 in stage 289.0 failed 4 times, most recent failure: Lost task 6.3 in stage 289.0 (xxxxxx, executor 2): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-xxx pulse 1 163684030 after polling for 512
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:165)
at com.olacabs.analytics.engine.EngineManager$1$1.call(EngineManager.java:132)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
> Spark Streaming, Kafka receiver, "Failed to get records for ... after polling for 512"
> --------------------------------------------------------------------------------------
>
> Key: SPARK-19275
> URL: https://issues.apache.org/jira/browse/SPARK-19275
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.0
> Environment: Apache Spark 2.0.0, Kafka 0.10 for Scala 2.11
> Reporter: Dmitry Ochnev
>
> We have a Spark Streaming application reading records from Kafka 0.10.
> Some tasks are failed because of the following error:
> "java.lang.AssertionError: assertion failed: Failed to get records for (...) after polling for 512"
> The first attempt fails and the second attempt (retry) completes successfully, - this is the pattern that we see for many tasks in our logs. These fails and retries consume resources.
> A similar case with a stack trace are described here:
> https://www.mail-archive.com/user@spark.apache.org/msg56564.html
> https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
> Here is the line from the stack trace where the error is raised:
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> We tried several values for "spark.streaming.kafka.consumer.poll.ms", - 2, 5, 10, 30 and 60 seconds, but the error appeared in all the cases except the last one. Moreover, increasing the threshold led to increasing total Spark stage duration.
> In other words, increasing "spark.streaming.kafka.consumer.poll.ms" led to fewer task failures but with cost of total stage duration. So, it is bad for performance when processing data streams.
> We have a suspicion that there is a bug in CachedKafkaConsumer (and/or other related classes) which inhibits the reading process.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org