You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/07 02:44:46 UTC

spark git commit: [SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to use new Producer API

Repository: spark
Updated Branches:
  refs/heads/master 4e930420c -> 316a5c042


[SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to use new Producer API

Otherwise it will throw exception:

```
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:77)
	at org.apache.spark.examples.streaming.KafkaWordCountProducer$.main(KafkaWordCount.scala:96)
	at org.apache.spark.examples.streaming.KafkaWordCountProducer.main(KafkaWordCount.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:623)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

Author: jerryshao <sa...@intel.com>

Closes #5936 from jerryshao/SPARK-7396 and squashes the following commits:

270bbe2 [jerryshao] Fix Kafka Produce throw Exception issue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/316a5c04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/316a5c04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/316a5c04

Branch: refs/heads/master
Commit: 316a5c0423ba3688cacd3acc3c5b5571e8a71d1d
Parents: 4e93042
Author: jerryshao <sa...@intel.com>
Authored: Wed May 6 17:44:43 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed May 6 17:44:43 2015 -0700

----------------------------------------------------------------------
 .../examples/streaming/KafkaWordCount.scala     | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/316a5c04/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 387c0e4..f407367 100644
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.examples.streaming
 
-import java.util.Properties
+import java.util.HashMap
 
-import kafka.producer._
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
 
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
@@ -77,23 +77,25 @@ object KafkaWordCountProducer {
     val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
 
     // Zookeeper connection properties
-    val props = new Properties()
-    props.put("metadata.broker.list", brokers)
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    val props = new HashMap[String, Object]()
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+      "org.apache.kafka.common.serialization.StringSerializer")
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+      "org.apache.kafka.common.serialization.StringSerializer")
 
-    val config = new ProducerConfig(props)
-    val producer = new Producer[String, String](config)
+    val producer = new KafkaProducer[String, String](props)
 
     // Send some messages
     while(true) {
-      val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+      (1 to messagesPerSec.toInt).foreach { messageNum =>
         val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
           .mkString(" ")
 
-        new KeyedMessage[String, String](topic, str)
-      }.toArray
+        val message = new ProducerRecord[String, String](topic, null, str)
+        producer.send(message)
+      }
 
-      producer.send(messages: _*)
       Thread.sleep(100)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org