You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2017/07/12 21:50:00 UTC

[jira] [Comment Edited] (SPARK-21378) Spark Poll timeout when specific offsets are passed

    [ https://issues.apache.org/jira/browse/SPARK-21378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084772#comment-16084772 ] 

Shixiong Zhu edited comment on SPARK-21378 at 7/12/17 9:49 PM:
---------------------------------------------------------------

bq. Digging deeper shows that there's an assert statement such that if no records are returned (which is a valid case) then a failure will happen.

That's actually not a valid case. CachedKafkaConsumer.scala uses the offset range generated in the driver, so the records are supposed to be in Kafka. If not, then it means timeout, or the data is missing. If it's just because of timeout, you can increase "spark.streaming.kafka.consumer.poll.ms" (available since Spark 2.1.0).




was (Author: zsxwing):
bq. Digging deeper shows that there's an assert statement such that if no records are returned (which is a valid case) then a failure will happen.

That's actually not a valid case. CachedKafkaConsumer.scala uses the offset range generated in the driver, so the records are supposed to be in Kafka. If not, then it means timeout, or the data is missing. If it's just because of timeout, you can increase "spark.streaming.kafka.consumer.poll.ms".



> Spark Poll timeout when specific offsets are passed
> ---------------------------------------------------
>
>                 Key: SPARK-21378
>                 URL: https://issues.apache.org/jira/browse/SPARK-21378
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0, 2.0.2
>            Reporter: Ambud Sharma
>
> Kafka direct stream fails with poll timeout:
> {code:java}
> JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
> 				LocationStrategies.PreferConsistent(),
> 				ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams, fromOffsets));
> {code}
> Digging deeper shows that there's an assert statement such that if no records are returned (which is a valid case) then a failure will happen.
> https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L75
> This solution: https://issues.apache.org/jira/browse/SPARK-19275 keeps getting "Added jobs for time" and eventually leads to "Failed to get records for spark-xxxxx after polling for 3000"; in this case batch size is 3seconds
> We can increase it to an even bigger number which leads to OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org