You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pranav Nakhe (JIRA)" <ji...@apache.org> on 2016/12/09 19:33:58 UTC

[jira] [Closed] (SPARK-18779) Messages being received only from one partition when using Spark Streaming integration for Kafka 0.10 with kafka client library at 0.10.1

     [ https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pranav Nakhe closed SPARK-18779.
--------------------------------
    Resolution: Not A Problem

I looked at the code and was able to recreate this issue with a kafka client outside spark. Hence closing this.

> Messages being received only from one partition when using Spark Streaming integration for Kafka 0.10 with kafka client library at 0.10.1
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18779
>                 URL: https://issues.apache.org/jira/browse/SPARK-18779
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.0.2
>            Reporter: Pranav Nakhe
>
> I apologize for the earlier descripion which wasnt very clear about the issue. I would give a detailed description and my usecase now -
> I have a spark application running which is consuming kafka messages using Spark Kafka 0.10 integration. I now need to stop my spark application and the user would then tell what timestamp in the past the spark application should start reading messages from (replaying messages). The timestamp is mapped to kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced in 10.1.0 client of Kafka. That offset is then used to create DStream
> Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 10.1.0. 
> So to achieve that behavior I replaced the 10.0.1 jar in Spark environment with 10.1.0 jar. Things started working for me but the application could read only messages from the first partition.
> To recreate the issue I wrote a local program and had 10.1.0 jar in the classpath
> ********************************
> val topics = Set("Z1Topic")
> val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded offset to 10 instead of getting the offset from 'offsetsForTimes'
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
> import scala.collection.JavaConversions._
> val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferBrokers, Subscribe[String, String](topics, kafkaParams, topicPartitionOffsetMap))
> val x = stream.map(x => x.value())
> x.print()
> ********************************
> This printed only the messages in the first topic from offset 10.  (This is with 10.1.0 client)
> If I am to use Kafka 10.0.1 client for the above program, things work fine and I receive messages from all partitions but I cant use the 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org