You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/14 14:20:50 UTC

[spark] branch master updated: [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fecc4a  [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
2fecc4a is described below

commit 2fecc4a3fe910163b51b6017406b3f1f05658787
Author: DylanGuedes <dj...@gmail.com>
AuthorDate: Thu Mar 14 09:20:30 2019 -0500

    [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
    
    ## What changes were proposed in this pull request?
    
    To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on.
    
    ## How was this patch tested?
    I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine.
    
    Closes #24071 from DylanGuedes/spark-27138.
    
    Authored-by: DylanGuedes <dj...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index dacfffa..9fa88b4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -20,20 +20,19 @@ package org.apache.spark.sql.kafka010
 import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties, UUID}
+import java.util.{Collections, Map => JMap, Properties, UUID}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.language.postfixOps
 import scala.util.Random
 
-import kafka.admin.AdminUtils
 import kafka.api.Request
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions}
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -195,7 +194,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
     var created = false
     while (!created) {
       try {
-        AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+        val newTopic = new NewTopic(topic, partitions, 1)
+        adminClient.createTopics(Collections.singleton(newTopic))
         created = true
       } catch {
         // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and
@@ -222,7 +222,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
   /** Delete a Kafka topic and wait until it is propagated to the whole cluster */
   def deleteTopic(topic: String): Unit = {
     val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminClient.deleteTopics(Collections.singleton(topic))
     verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
   }
 
@@ -422,7 +422,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
           // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
           // chance that a topic will be recreated after deletion due to the asynchronous update.
           // Hence, delete the topic and retry.
-          AdminUtils.deleteTopic(zkUtils, topic)
+          adminClient.deleteTopics(Collections.singleton(topic))
           throw e
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org