You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/07/20 17:19:18 UTC
spark git commit: [SPARK-21142][SS] spark-streaming-kafka-0-10 should
depend on kafka-clients instead of full blown kafka library
Repository: spark
Updated Branches:
refs/heads/master da9f067a1 -> 03367d7aa
[SPARK-21142][SS] spark-streaming-kafka-0-10 should depend on kafka-clients instead of full blown kafka library
## What changes were proposed in this pull request?
Currently spark-streaming-kafka-0-10 has a dependency on the full kafka distribution (but only uses and requires the kafka-clients library).
The PR fixes that (the library only depends on kafka-clients), and the tests depend on the full kafka.
## How was this patch tested?
All existing tests still pass.
Author: Tim Van Wassenhove <gi...@timvw.be>
Closes #18353 from timvw/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03367d7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03367d7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03367d7a
Branch: refs/heads/master
Commit: 03367d7aa3acd3abbcacba57ea75c8efa2a9a794
Parents: da9f067
Author: Tim Van Wassenhove <gi...@timvw.be>
Authored: Thu Jul 20 18:19:14 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jul 20 18:19:14 2017 +0100
----------------------------------------------------------------------
external/kafka-0-10-sql/pom.xml | 5 +-
external/kafka-0-10/pom.xml | 31 +-
.../streaming/kafka010/KafkaTestUtils.scala | 304 -------------------
.../streaming/kafka010/KafkaTestUtils.scala | 304 +++++++++++++++++++
4 files changed, 315 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 557d272..0f61a10 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,6 +29,7 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
+ <kafka.version>0.10.0.1</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>
@@ -65,12 +66,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.10.0.1</version>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.10.0.1</version>
+ <version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 6c98cb0..4d9861a 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -28,6 +28,7 @@
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>streaming-kafka-0-10</sbt.project.name>
+ <kafka.version>0.10.0.1</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.10</name>
@@ -49,30 +50,14 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.10.0.1</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
deleted file mode 100644
index 6c7024e..0000000
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka010
-
-import java.io.{File, IOException}
-import java.lang.{Integer => JInt}
-import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import kafka.admin.AdminUtils
-import kafka.api.Request
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
-
-/**
- * This is a helper class for Kafka test suites. This has the functionality to set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- *
- * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
- */
-private[kafka010] class KafkaTestUtils extends Logging {
-
- // Zookeeper related configurations
- private val zkHost = "localhost"
- private var zkPort: Int = 0
- private val zkConnectionTimeout = 60000
- private val zkSessionTimeout = 6000
-
- private var zookeeper: EmbeddedZookeeper = _
-
- private var zkUtils: ZkUtils = _
-
- // Kafka broker related configurations
- private val brokerHost = "localhost"
- private var brokerPort = 0
- private var brokerConf: KafkaConfig = _
-
- // Kafka broker server
- private var server: KafkaServer = _
-
- // Kafka producer
- private var producer: KafkaProducer[String, String] = _
-
- // Flag to test whether the system is correctly started
- private var zkReady = false
- private var brokerReady = false
-
- def zkAddress: String = {
- assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
- s"$zkHost:$zkPort"
- }
-
- def brokerAddress: String = {
- assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
- s"$brokerHost:$brokerPort"
- }
-
- def zookeeperClient: ZkUtils = {
- assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
- Option(zkUtils).getOrElse(
- throw new IllegalStateException("Zookeeper client is not yet initialized"))
- }
-
- // Set up the Embedded Zookeeper server and get the proper Zookeeper port
- private def setupEmbeddedZookeeper(): Unit = {
- // Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
- // Get the actual zookeeper binding port
- zkPort = zookeeper.actualPort
- zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
- zkReady = true
- }
-
- // Set up the Embedded Kafka server
- private def setupEmbeddedKafkaServer(): Unit = {
- assert(zkReady, "Zookeeper should be set up beforehand")
-
- // Kafka broker startup
- Utils.startServiceOnPort(brokerPort, port => {
- brokerPort = port
- brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
- server = new KafkaServer(brokerConf)
- server.startup()
- brokerPort = server.boundPort()
- (server, brokerPort)
- }, new SparkConf(), "KafkaBroker")
-
- brokerReady = true
- }
-
- /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
- def setup(): Unit = {
- setupEmbeddedZookeeper()
- setupEmbeddedKafkaServer()
- }
-
- /** Teardown the whole servers, including Kafka broker and Zookeeper */
- def teardown(): Unit = {
- brokerReady = false
- zkReady = false
-
- if (producer != null) {
- producer.close()
- producer = null
- }
-
- if (server != null) {
- server.shutdown()
- server.awaitShutdown()
- server = null
- }
-
- // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
- // in some cases. It leads to test failures on Windows if the directory deletion failure
- // throws an exception.
- brokerConf.logDirs.foreach { f =>
- try {
- Utils.deleteRecursively(new File(f))
- } catch {
- case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
- }
- }
-
- if (zkUtils != null) {
- zkUtils.close()
- zkUtils = null
- }
-
- if (zookeeper != null) {
- zookeeper.shutdown()
- zookeeper = null
- }
- }
-
- /** Create a Kafka topic and wait until it is propagated to the whole cluster */
- def createTopic(topic: String, partitions: Int): Unit = {
- AdminUtils.createTopic(zkUtils, topic, partitions, 1)
- // wait until metadata is propagated
- (0 until partitions).foreach { p =>
- waitUntilMetadataIsPropagated(topic, p)
- }
- }
-
- /** Create a Kafka topic and wait until it is propagated to the whole cluster */
- def createTopic(topic: String): Unit = {
- createTopic(topic, 1)
- }
-
- /** Java-friendly function for sending messages to the Kafka broker */
- def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
- sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
- }
-
- /** Send the messages to the Kafka broker */
- def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
- val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
- sendMessages(topic, messages)
- }
-
- /** Send the array of messages to the Kafka broker */
- def sendMessages(topic: String, messages: Array[String]): Unit = {
- producer = new KafkaProducer[String, String](producerConfiguration)
- messages.foreach { message =>
- producer.send(new ProducerRecord[String, String](topic, message))
- }
- producer.close()
- producer = null
- }
-
- private def brokerConfiguration: Properties = {
- val props = new Properties()
- props.put("broker.id", "0")
- props.put("host.name", "localhost")
- props.put("port", brokerPort.toString)
- props.put("log.dir", Utils.createTempDir().getAbsolutePath)
- props.put("zookeeper.connect", zkAddress)
- props.put("log.flush.interval.messages", "1")
- props.put("replica.socket.timeout.ms", "1500")
- props
- }
-
- private def producerConfiguration: Properties = {
- val props = new Properties()
- props.put("bootstrap.servers", brokerAddress)
- props.put("value.serializer", classOf[StringSerializer].getName)
- // Key serializer is required.
- props.put("key.serializer", classOf[StringSerializer].getName)
- // wait for all in-sync replicas to ack sends
- props.put("acks", "all")
- props
- }
-
- // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
- // dependency
- def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
- def makeAttempt(): Either[Throwable, T] = {
- try {
- Right(func)
- } catch {
- case e if NonFatal(e) => Left(e)
- }
- }
-
- val startTime = System.currentTimeMillis()
- @tailrec
- def tryAgain(attempt: Int): T = {
- makeAttempt() match {
- case Right(result) => result
- case Left(e) =>
- val duration = System.currentTimeMillis() - startTime
- if (duration < timeout.milliseconds) {
- Thread.sleep(interval.milliseconds)
- } else {
- throw new TimeoutException(e.getMessage)
- }
-
- tryAgain(attempt + 1)
- }
- }
-
- tryAgain(1)
- }
-
- private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
- def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
- case Some(partitionState) =>
- val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
- zkUtils.getLeaderForPartition(topic, partition).isDefined &&
- Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.nonEmpty
-
- case _ =>
- false
- }
- eventually(Time(10000), Time(100)) {
- assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
- }
- }
-
- private class EmbeddedZookeeper(val zkConnect: String) {
- val snapshotDir = Utils.createTempDir()
- val logDir = Utils.createTempDir()
-
- val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
- val (ip, port) = {
- val splits = zkConnect.split(":")
- (splits(0), splits(1).toInt)
- }
- val factory = new NIOServerCnxnFactory()
- factory.configure(new InetSocketAddress(ip, port), 16)
- factory.startup(zookeeper)
-
- val actualPort = factory.getLocalPort
-
- def shutdown() {
- factory.shutdown()
- // The directories are not closed even if the ZooKeeper server is shut down.
- // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
- // on Windows if the directory deletion failure throws an exception.
- try {
- Utils.deleteRecursively(snapshotDir)
- } catch {
- case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
- }
- try {
- Utils.deleteRecursively(logDir)
- } catch {
- case e: IOException if Utils.isWindows =>
- logWarning(e.getMessage)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000..6c7024e
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.{File, IOException}
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka010] class KafkaTestUtils extends Logging {
+
+ // Zookeeper related configurations
+ private val zkHost = "localhost"
+ private var zkPort: Int = 0
+ private val zkConnectionTimeout = 60000
+ private val zkSessionTimeout = 6000
+
+ private var zookeeper: EmbeddedZookeeper = _
+
+ private var zkUtils: ZkUtils = _
+
+ // Kafka broker related configurations
+ private val brokerHost = "localhost"
+ private var brokerPort = 0
+ private var brokerConf: KafkaConfig = _
+
+ // Kafka broker server
+ private var server: KafkaServer = _
+
+ // Kafka producer
+ private var producer: KafkaProducer[String, String] = _
+
+ // Flag to test whether the system is correctly started
+ private var zkReady = false
+ private var brokerReady = false
+
+ def zkAddress: String = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+ s"$zkHost:$zkPort"
+ }
+
+ def brokerAddress: String = {
+ assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+ s"$brokerHost:$brokerPort"
+ }
+
+ def zookeeperClient: ZkUtils = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+ Option(zkUtils).getOrElse(
+ throw new IllegalStateException("Zookeeper client is not yet initialized"))
+ }
+
+ // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+ private def setupEmbeddedZookeeper(): Unit = {
+ // Zookeeper server startup
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
+ zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
+ zkReady = true
+ }
+
+ // Set up the Embedded Kafka server
+ private def setupEmbeddedKafkaServer(): Unit = {
+ assert(zkReady, "Zookeeper should be set up beforehand")
+
+ // Kafka broker startup
+ Utils.startServiceOnPort(brokerPort, port => {
+ brokerPort = port
+ brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+ server = new KafkaServer(brokerConf)
+ server.startup()
+ brokerPort = server.boundPort()
+ (server, brokerPort)
+ }, new SparkConf(), "KafkaBroker")
+
+ brokerReady = true
+ }
+
+ /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+ def setup(): Unit = {
+ setupEmbeddedZookeeper()
+ setupEmbeddedKafkaServer()
+ }
+
+ /** Teardown the whole servers, including Kafka broker and Zookeeper */
+ def teardown(): Unit = {
+ brokerReady = false
+ zkReady = false
+
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+
+ if (server != null) {
+ server.shutdown()
+ server.awaitShutdown()
+ server = null
+ }
+
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
+
+ if (zkUtils != null) {
+ zkUtils.close()
+ zkUtils = null
+ }
+
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+ // wait until metadata is propagated
+ (0 until partitions).foreach { p =>
+ waitUntilMetadataIsPropagated(topic, p)
+ }
+ }
+
+ /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+ def createTopic(topic: String): Unit = {
+ createTopic(topic, 1)
+ }
+
+ /** Java-friendly function for sending messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+ sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /** Send the messages to the Kafka broker */
+ def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+ val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+ sendMessages(topic, messages)
+ }
+
+ /** Send the array of messages to the Kafka broker */
+ def sendMessages(topic: String, messages: Array[String]): Unit = {
+ producer = new KafkaProducer[String, String](producerConfiguration)
+ messages.foreach { message =>
+ producer.send(new ProducerRecord[String, String](topic, message))
+ }
+ producer.close()
+ producer = null
+ }
+
+ private def brokerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("broker.id", "0")
+ props.put("host.name", "localhost")
+ props.put("port", brokerPort.toString)
+ props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+ props.put("zookeeper.connect", zkAddress)
+ props.put("log.flush.interval.messages", "1")
+ props.put("replica.socket.timeout.ms", "1500")
+ props
+ }
+
+ private def producerConfiguration: Properties = {
+ val props = new Properties()
+ props.put("bootstrap.servers", brokerAddress)
+ props.put("value.serializer", classOf[StringSerializer].getName)
+ // Key serializer is required.
+ props.put("key.serializer", classOf[StringSerializer].getName)
+ // wait for all in-sync replicas to ack sends
+ props.put("acks", "all")
+ props
+ }
+
+ // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+ // dependency
+ def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+ def makeAttempt(): Either[Throwable, T] = {
+ try {
+ Right(func)
+ } catch {
+ case e if NonFatal(e) => Left(e)
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ @tailrec
+ def tryAgain(attempt: Int): T = {
+ makeAttempt() match {
+ case Right(result) => result
+ case Left(e) =>
+ val duration = System.currentTimeMillis() - startTime
+ if (duration < timeout.milliseconds) {
+ Thread.sleep(interval.milliseconds)
+ } else {
+ throw new TimeoutException(e.getMessage)
+ }
+
+ tryAgain(attempt + 1)
+ }
+ }
+
+ tryAgain(1)
+ }
+
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+ def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+ case Some(partitionState) =>
+ val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+ zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+ Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+ leaderAndInSyncReplicas.isr.nonEmpty
+
+ case _ =>
+ false
+ }
+ eventually(Time(10000), Time(100)) {
+ assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+ }
+ }
+
+ private class EmbeddedZookeeper(val zkConnect: String) {
+ val snapshotDir = Utils.createTempDir()
+ val logDir = Utils.createTempDir()
+
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+ val (ip, port) = {
+ val splits = zkConnect.split(":")
+ (splits(0), splits(1).toInt)
+ }
+ val factory = new NIOServerCnxnFactory()
+ factory.configure(new InetSocketAddress(ip, port), 16)
+ factory.startup(zookeeper)
+
+ val actualPort = factory.getLocalPort
+
+ def shutdown() {
+ factory.shutdown()
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org