You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "amit kumar (JIRA)" <ji...@apache.org> on 2017/05/09 07:01:04 UTC
[jira] [Closed] (SPARK-20671) Processing muitple kafka topics with
single spark streaming context hangs on batchSubmitted.
[ https://issues.apache.org/jira/browse/SPARK-20671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
amit kumar closed SPARK-20671.
------------------------------
Resolution: Not A Problem
My Bad . I configured it wrong. setMaster("local[*]") in place of setMaster("local[2]") works.
> Processing muitple kafka topics with single spark streaming context hangs on batchSubmitted.
> --------------------------------------------------------------------------------------------
>
> Key: SPARK-20671
> URL: https://issues.apache.org/jira/browse/SPARK-20671
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.0
> Environment: Ubuntu
> Reporter: amit kumar
>
> object SparkMain extends App {
> System.setProperty("spark.cassandra.connection.host", "127.0.0.1")
> val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Seconds(5))
> val sqlContext= new SQLContext(sc)
> val host = "localhost:2181"
> val topicList = List("test","fb")
> topicList.foreach{
> topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2);
> //configureStream(topic, lines)
> lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key")))
> }
> ssc.addStreamingListener(new StreamingListener {
> override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
> System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString + " ms")
> }
> override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
> println("inside onReceiverStarted")
> }
> override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
> println("inside onReceiverError")
> }
> override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
> println("inside onReceiverStopped")
> }
> override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
> println("inside onBatchSubmitted")
> }
> override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
> println("inside onBatchStarted")
> }
> })
> ssc.start()
> println("===========================")
> ssc.awaitTermination()
> }
> case class test(key: String)
> ========
> If i put any one of the topics at a time then each topic works.But when topic list has more than one topic, after getting the DataStream from kafka topic, it keeps printing "inside onBatchSubmitted". Thanks in advance.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org