You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hafiz Mujadid <ha...@gmail.com> on 2014/12/03 08:35:08 UTC
getting firs N messages froma Kafka topic using Spark Streaming
Hi Experts!
Is there a way to read first N messages from kafka stream and put them in
some collection and return to the caller for visualization purpose and close
spark streaming.
I will be glad to hear from you and will be thankful to you.
Currently I have following code that
def getsample(params: scala.collection.immutable.Map[String, String]): Unit
= {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new
SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String,
String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap,
StorageLevel.MEMORY_ONLY).map(_._2)
val streams = data.window(Seconds(interval), Seconds(interval)).map(x =>
new String(x))
streams.foreach(rdd => rdd.foreachPartition(itr => {
while (itr.hasNext && size >= 0) {
var msg=itr.next
println(msg)
sample.append(msg)
sample.append("\n")
size -= 1
}
}))
ssc.start()
ssc.awaitTermination(5000)
ssc.stop(true)
}
Where sample is a StringBuilder, when I print the contents of this string
builder after getSample method call is returned. I got nothing in it.
Any help will be appreciated
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: getting firs N messages froma Kafka topic using Spark Streaming
Posted by Hafiz Mujadid <ha...@gmail.com>.
Hi Akhil!
Thanks for your response. Can you please suggest me how to return this
sample from a function to the caller and stopping SparkStreaming
Thanks
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227p20249.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: getting firs N messages froma Kafka topic using Spark Streaming
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You could do something like:
val stream = kafkaStream.getStream().repartition(1).mapPartitions(x => x.
take(*10*))
Here stream will have 10 elements from the kafakaStream.
Thanks
Best Regards
On Wed, Dec 3, 2014 at 1:05 PM, Hafiz Mujadid <ha...@gmail.com>
wrote:
> Hi Experts!
>
> Is there a way to read first N messages from kafka stream and put them in
> some collection and return to the caller for visualization purpose and
> close
> spark streaming.
>
> I will be glad to hear from you and will be thankful to you.
>
> Currently I have following code that
>
> def getsample(params: scala.collection.immutable.Map[String, String]): Unit
> = {
> if (params.contains("zookeeperQourum"))
> zkQuorum = params.get("zookeeperQourum").get
> if (params.contains("userGroup"))
> group = params.get("userGroup").get
> if (params.contains("topics"))
> topics = params.get("topics").get
> if (params.contains("numberOfThreads"))
> numThreads = params.get("numberOfThreads").get
> if (params.contains("sink"))
> sink = params.get("sink").get
> if (params.contains("batchInterval"))
> interval = params.get("batchInterval").get.toInt
> val sparkConf = new
>
> SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
> val ssc = new StreamingContext(sparkConf, Seconds(interval))
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> var consumerConfig = scala.collection.immutable.Map.empty[String,
> String]
> consumerConfig += ("auto.offset.reset" -> "smallest")
> consumerConfig += ("zookeeper.connect" -> zkQuorum)
> consumerConfig += ("group.id" -> group)
> var data = KafkaUtils.createStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap,
> StorageLevel.MEMORY_ONLY).map(_._2)
> val streams = data.window(Seconds(interval), Seconds(interval)).map(x
> =>
> new String(x))
> streams.foreach(rdd => rdd.foreachPartition(itr => {
> while (itr.hasNext && size >= 0) {
> var msg=itr.next
> println(msg)
> sample.append(msg)
> sample.append("\n")
> size -= 1
> }
> }))
> ssc.start()
> ssc.awaitTermination(5000)
> ssc.stop(true)
> }
>
> Where sample is a StringBuilder, when I print the contents of this string
> builder after getSample method call is returned. I got nothing in it.
>
>
> Any help will be appreciated
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>