You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/14 14:20:50 UTC
[spark] branch master updated: [SPARK-27138][TESTS][KAFKA] Remove
AdminUtils calls (fixes deprecation)
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2fecc4a [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
2fecc4a is described below
commit 2fecc4a3fe910163b51b6017406b3f1f05658787
Author: DylanGuedes <dj...@gmail.com>
AuthorDate: Thu Mar 14 09:20:30 2019 -0500
[SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation)
## What changes were proposed in this pull request?
To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on.
## How was this patch tested?
I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine.
Closes #24071 from DylanGuedes/spark-27138.
Authored-by: DylanGuedes <dj...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index dacfffa..9fa88b4 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -20,20 +20,19 @@ package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties, UUID}
+import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.Random
-import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions}
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@@ -195,7 +194,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
var created = false
while (!created) {
try {
- AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+ val newTopic = new NewTopic(topic, partitions, 1)
+ adminClient.createTopics(Collections.singleton(newTopic))
created = true
} catch {
// Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and
@@ -222,7 +222,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
/** Delete a Kafka topic and wait until it is propagated to the whole cluster */
def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminClient.deleteTopics(Collections.singleton(topic))
verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
}
@@ -422,7 +422,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// chance that a topic will be recreated after deletion due to the asynchronous update.
// Hence, delete the topic and retry.
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminClient.deleteTopics(Collections.singleton(topic))
throw e
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org