You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "amit kumar (JIRA)" <ji...@apache.org> on 2017/05/09 07:01:04 UTC

[jira] [Closed] (SPARK-20671) Processing muitple kafka topics with single spark streaming context hangs on batchSubmitted.

     [ https://issues.apache.org/jira/browse/SPARK-20671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

amit kumar closed SPARK-20671.
------------------------------
    Resolution: Not A Problem

My Bad . I configured it wrong. setMaster("local[*]") in place of setMaster("local[2]") works.

> Processing muitple kafka topics with single spark streaming context hangs on batchSubmitted.
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20671
>                 URL: https://issues.apache.org/jira/browse/SPARK-20671
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0
>         Environment: Ubuntu
>            Reporter: amit kumar
>
> object SparkMain extends App {
>  System.setProperty("spark.cassandra.connection.host", "127.0.0.1")
>  val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaspark").set("spark.streaming.concurrentJobs","4")
>  val sc = new SparkContext(conf)
>  val ssc = new StreamingContext(sc, Seconds(5))
>  val sqlContext= new SQLContext(sc)
>  val host = "localhost:2181"
>  val topicList = List("test","fb")
>  topicList.foreach{
>    topic=> val lines =KafkaUtils.createStream(ssc, host, topic, Map(topic -> 1)).map(_._2);
>      //configureStream(topic, lines)
>      lines.foreachRDD(rdd => rdd.map(test(_)).saveToCassandra("test","rawdata",SomeColumns("key")))
>  }
>   ssc.addStreamingListener(new StreamingListener {
>    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
>      System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo.totalDelay.get.toString + " ms")
>    }
>     override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
>      println("inside onReceiverStarted")
>    }
>     override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
>      println("inside onReceiverError")
>    }
>     override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
>      println("inside onReceiverStopped")
>    }
>     override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
>      println("inside onBatchSubmitted")
>    }
>     override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
>      println("inside onBatchStarted")
>    }
>  })
>   ssc.start()
>  println("===========================")
>  ssc.awaitTermination()
> }
> case class test(key: String)
> ========
> If i put any one of the topics at a time then each topic works.But when topic list has more than one topic, after getting the DataStream from kafka topic, it keeps printing "inside onBatchSubmitted". Thanks in advance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org