You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vlad Badelita (JIRA)" <ji...@apache.org> on 2017/07/31 13:40:00 UTC
[jira] [Commented] (SPARK-21561) spark-streaming-kafka-010 DSteam
is not pulling anything from Kafka
[ https://issues.apache.org/jira/browse/SPARK-21561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107324#comment-16107324 ]
Vlad Badelita commented on SPARK-21561:
---------------------------------------
Sorry, didn't know where to ask about this issues. I have since found out it was an actual bug, that someone else experienced:
https://issues.apache.org/jira/browse/SPARK-18779
However, it was a kafka bug, not a spark-streaming one and was solved here:
https://issues.apache.org/jira/browse/KAFKA-4547
And it changing the kafka-clients version to 0.10.2.1 fixed it for me.
> spark-streaming-kafka-010 DSteam is not pulling anything from Kafka
> -------------------------------------------------------------------
>
> Key: SPARK-21561
> URL: https://issues.apache.org/jira/browse/SPARK-21561
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.1
> Reporter: Vlad Badelita
> Labels: kafka-0.10, spark-streaming
>
> I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka topic(broker version 0.10). I have checked that messages are being produced and used a KafkaConsumer to pull them successfully. Now, when I try to use the spark streaming api, I am not getting anything. If I just use KafkaUtils.createRDD and specify some offset ranges manually it works. But when, I try to use createDirectStream, all the rdds are empty and when I check the partition offsets it simply reports that all partitions are 0. Here is what I tried:
> {code:scala}
> val sparkConf = new SparkConf().setAppName("kafkastream")
> val ssc = new StreamingContext(sparkConf, Seconds(3))
> val topics = Array("my_topic")
> val kafkaParams = Map[String, Object](
> "bootstrap.servers" -> "hostname:6667"
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> classOf[StringDeserializer],
> "group.id" -> "my_group",
> "auto.offset.reset" -> "earliest",
> "enable.auto.commit" -> (true: java.lang.Boolean)
> )
> val stream = KafkaUtils.createDirectStream[String, String](
> ssc,
> PreferConsistent,
> Subscribe[String, String](topics, kafkaParams)
> )
> stream.foreachRDD { rdd =>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> rdd.foreachPartition { iter =>
> val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
> println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
> }
> val rddCount = rdd.count()
> println("rdd count: ", rddCount)
> // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
> ssc.start()
> ssc.awaitTermination()
> {code}
> All partitions show offset ranges from 0 to 0 and all rdds are empty. I would like it to start from the beginning of a partition but also pick up everything that is being produced to it.
> I have also tried using spark-streaming-kafka-0.8 and it does work. I think it is a 0.10 issue because everything else works fine. Thank you!
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org