You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lu...@sina.com on 2015/03/30 07:34:36 UTC

转发:How SparkStreaming output messages to Kafka?

Hi guys,
          I am using SparkStreaming to receive message from kafka,process it and then send back to kafka. however ,kafka consumer can not receive any messages. Any one share ideas?
 
here is my code:
 
object SparkStreamingSampleDirectApproach {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)  
    
    val Array(brokers, topics) = Array("localhost:9092", "topic1")
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory", "WARN, console")
    val ssc = new StreamingContext(conf, Seconds(1))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
//    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    
    val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    
    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)
//    val messages2 = messages.map{line =>
//      new KeyedMessage[String, String](topic2,wordCounts.toString())
//    }.toArray
    
    val messages2 = new KeyedMessage[String, String](topic2,messages.toString())
    println(messages2)
    
    producer.send(messages2)
    
    ssc.start()
    ssc.awaitTermination() 
  }
}


--------------------------------


 
Thanks&Best regards!
罗辉 San.Luo

Re: How SparkStreaming output messages to Kafka?

Posted by Saisai Shao <sa...@gmail.com>.
Hi Hui,

Did you try the direct Kafka stream example under Spark Streaming's
examples? Does it still fail to receive the message? Also would you please
check all the setups including Kafka, test with Kafka console consumer to
see if Kafka is OK.

Besides seeing from your code, there's some problems in your code, here:

    val messages2 = new KeyedMessage[String, String](topic2,messages.
toString())
    println(messages2)

    producer.send(messages2)

This code snippets are not lazily evaluated, this will be executed ONLY
ONCE when running to here, so actually you may not write the data into the
Kafka, you need to write like this:

messages.foreachRDD { r =>
     r.foreachPartition{ iter =>
         // create Producer
         // change this partition of data (iter) into keyedMessage and
write into Kafka.
        }
}

This is the basic style, sorry for any missing parts and typos, also pay a
attention to serialization issue when you need to create executors on
remote side. Please take a try again.

Thanks
Jerry



2015-03-30 13:34 GMT+08:00 <lu...@sina.com>:

> Hi guys,
>
>           I am using SparkStreaming to receive message from kafka,process
> it and then send back to kafka. however ,kafka consumer can not receive any
> messages. Any one share ideas?
>
>
>
> here is my code:
>
>
>
> object SparkStreamingSampleDirectApproach {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
>     val lines = messages.map(_._2)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     wordCounts.print()
>
>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config = new ProducerConfig(props)
>     val producer = new Producer[String, String](config)
>
> //    val messages2 = messages.map{line =>
> //      new KeyedMessage[String, String](topic2,wordCounts.toString())
> //    }.toArray
>
>     val messages2 = new KeyedMessage[String,
> String](topic2,messages.toString())
>     println(messages2)
>
>     producer.send(messages2)
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>

Re: How SparkStreaming output messages to Kafka?

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Are you having enough messages in kafka to consume? Can you make sure you
kafka setup is working with your console consumer? Also try this example
<https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala>

Thanks
Best Regards

On Mon, Mar 30, 2015 at 11:04 AM, <lu...@sina.com> wrote:

> Hi guys,
>
>           I am using SparkStreaming to receive message from kafka,process
> it and then send back to kafka. however ,kafka consumer can not receive any
> messages. Any one share ideas?
>
>
>
> here is my code:
>
>
>
> object SparkStreamingSampleDirectApproach {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
>     val lines = messages.map(_._2)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     wordCounts.print()
>
>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config = new ProducerConfig(props)
>     val producer = new Producer[String, String](config)
>
> //    val messages2 = messages.map{line =>
> //      new KeyedMessage[String, String](topic2,wordCounts.toString())
> //    }.toArray
>
>     val messages2 = new KeyedMessage[String,
> String](topic2,messages.toString())
>     println(messages2)
>
>     producer.send(messages2)
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>