You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/01 13:57:06 UTC

spark git commit: [SPARK-16776][STREAMING] Replace deprecated API in KafkaTestUtils for 0.10.0.

Repository: spark
Updated Branches:
  refs/heads/master 1e9b59b73 -> f93ad4fe7


[SPARK-16776][STREAMING] Replace deprecated API in KafkaTestUtils for 0.10.0.

## What changes were proposed in this pull request?

This PR replaces the old Kafka API to 0.10.0 ones in `KafkaTestUtils`.

The change include:

 - `Producer` to `KafkaProducer`
 - Change configurations to equalvant ones. (I referred [here](http://kafka.apache.org/documentation.html#producerconfigs) for 0.10.0 and [here](http://kafka.apache.org/082/documentation.html#producerconfigs
) for old, 0.8.2).

This PR will remove the build warning as below:

```scala
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:71: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead.
[WARNING]   private var producer: Producer[String, String] = _
[WARNING]                         ^
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:181: class Producer in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.KafkaProducer instead.
[WARNING]     producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
[WARNING]                    ^
[WARNING] .../spark/streaming/kafka010/KafkaTestUtils.scala:181: class ProducerConfig in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerConfig instead.
[WARNING]     producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
[WARNING]                                                 ^
[WARNING] .../spark/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala:182: class KeyedMessage in package producer is deprecated: This class has been deprecated and will be removed in a future release. Please use org.apache.kafka.clients.producer.ProducerRecord instead.
[WARNING]     producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
[WARNING]                                      ^
[WARNING] four warnings found
[WARNING] warning: [options] bootstrap class path not set in conjunction with -source 1.7
[WARNING] 1 warning
```

## How was this patch tested?

Existing tests that use `KafkaTestUtils` should cover this.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14416 from HyukjinKwon/SPARK-16776.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f93ad4fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f93ad4fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f93ad4fe

Branch: refs/heads/master
Commit: f93ad4fe7c9728c8dd67a8095de3d39fad21d03f
Parents: 1e9b59b
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Aug 1 06:56:52 2016 -0700
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Aug 1 06:56:52 2016 -0700

----------------------------------------------------------------------
 .../streaming/kafka010/KafkaTestUtils.scala     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f93ad4fe/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 19192e4b..ecabe1c 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -30,10 +30,10 @@ import scala.util.control.NonFatal
 
 import kafka.admin.AdminUtils
 import kafka.api.Request
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
 import org.apache.spark.SparkConf
@@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private var server: KafkaServer = _
 
   // Kafka producer
-  private var producer: Producer[String, String] = _
+  private var producer: KafkaProducer[String, String] = _
 
   // Flag to test whether the system is correctly started
   private var zkReady = false
@@ -178,8 +178,10 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   /** Send the array of messages to the Kafka broker */
   def sendMessages(topic: String, messages: Array[String]): Unit = {
-    producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
-    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+    producer = new KafkaProducer[String, String](producerConfiguration)
+    messages.foreach { message =>
+      producer.send(new ProducerRecord[String, String](topic, message))
+    }
     producer.close()
     producer = null
   }
@@ -198,10 +200,12 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   private def producerConfiguration: Properties = {
     val props = new Properties()
-    props.put("metadata.broker.list", brokerAddress)
-    props.put("serializer.class", classOf[StringEncoder].getName)
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("value.serializer", classOf[StringSerializer].getName)
+    // Key serializer is required.
+    props.put("key.serializer", classOf[StringSerializer].getName)
     // wait for all in-sync replicas to ack sends
-    props.put("request.required.acks", "-1")
+    props.put("acks", "all")
     props
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org