You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/07/20 17:19:18 UTC

spark git commit: [SPARK-21142][SS] spark-streaming-kafka-0-10 should depend on kafka-clients instead of full blown kafka library

Repository: spark
Updated Branches:
  refs/heads/master da9f067a1 -> 03367d7aa


[SPARK-21142][SS] spark-streaming-kafka-0-10 should depend on kafka-clients instead of full blown kafka library

## What changes were proposed in this pull request?

Currently spark-streaming-kafka-0-10 has a dependency on the full kafka distribution (but only uses and requires the kafka-clients library).

The PR fixes that (the library only depends on kafka-clients), and the tests depend on the full kafka.

## How was this patch tested?

All existing tests still pass.

Author: Tim Van Wassenhove <gi...@timvw.be>

Closes #18353 from timvw/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03367d7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03367d7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03367d7a

Branch: refs/heads/master
Commit: 03367d7aa3acd3abbcacba57ea75c8efa2a9a794
Parents: da9f067
Author: Tim Van Wassenhove <gi...@timvw.be>
Authored: Thu Jul 20 18:19:14 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jul 20 18:19:14 2017 +0100

----------------------------------------------------------------------
 external/kafka-0-10-sql/pom.xml                 |   5 +-
 external/kafka-0-10/pom.xml                     |  31 +-
 .../streaming/kafka010/KafkaTestUtils.scala     | 304 -------------------
 .../streaming/kafka010/KafkaTestUtils.scala     | 304 +++++++++++++++++++
 4 files changed, 315 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 557d272..0f61a10 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,6 +29,7 @@
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <properties>
     <sbt.project.name>sql-kafka-0-10</sbt.project.name>
+    <kafka.version>0.10.0.1</kafka.version>
   </properties>
   <packaging>jar</packaging>
   <name>Kafka 0.10 Source for Structured Streaming</name>
@@ -65,12 +66,12 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>0.10.0.1</version>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.10.0.1</version>
+      <version>${kafka.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 6c98cb0..4d9861a 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -28,6 +28,7 @@
   <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
   <properties>
     <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
+    <kafka.version>0.10.0.1</kafka.version>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Integration for Kafka 0.10</name>
@@ -49,30 +50,14 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.10.0.1</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.sun.jmx</groupId>
-          <artifactId>jmxri</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jdmk</groupId>
-          <artifactId>jmxtools</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.sf.jopt-simple</groupId>
-          <artifactId>jopt-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
+      <version>${kafka.version}</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>net.sf.jopt-simple</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
deleted file mode 100644
index 6c7024e..0000000
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka010
-
-import java.io.{File, IOException}
-import java.lang.{Integer => JInt}
-import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import kafka.admin.AdminUtils
-import kafka.api.Request
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
-
-/**
- * This is a helper class for Kafka test suites. This has the functionality to set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- *
- * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
- */
-private[kafka010] class KafkaTestUtils extends Logging {
-
-  // Zookeeper related configurations
-  private val zkHost = "localhost"
-  private var zkPort: Int = 0
-  private val zkConnectionTimeout = 60000
-  private val zkSessionTimeout = 6000
-
-  private var zookeeper: EmbeddedZookeeper = _
-
-  private var zkUtils: ZkUtils = _
-
-  // Kafka broker related configurations
-  private val brokerHost = "localhost"
-  private var brokerPort = 0
-  private var brokerConf: KafkaConfig = _
-
-  // Kafka broker server
-  private var server: KafkaServer = _
-
-  // Kafka producer
-  private var producer: KafkaProducer[String, String] = _
-
-  // Flag to test whether the system is correctly started
-  private var zkReady = false
-  private var brokerReady = false
-
-  def zkAddress: String = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
-    s"$zkHost:$zkPort"
-  }
-
-  def brokerAddress: String = {
-    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
-    s"$brokerHost:$brokerPort"
-  }
-
-  def zookeeperClient: ZkUtils = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
-    Option(zkUtils).getOrElse(
-      throw new IllegalStateException("Zookeeper client is not yet initialized"))
-  }
-
-  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
-  private def setupEmbeddedZookeeper(): Unit = {
-    // Zookeeper server startup
-    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
-    // Get the actual zookeeper binding port
-    zkPort = zookeeper.actualPort
-    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
-    zkReady = true
-  }
-
-  // Set up the Embedded Kafka server
-  private def setupEmbeddedKafkaServer(): Unit = {
-    assert(zkReady, "Zookeeper should be set up beforehand")
-
-    // Kafka broker startup
-    Utils.startServiceOnPort(brokerPort, port => {
-      brokerPort = port
-      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
-      server = new KafkaServer(brokerConf)
-      server.startup()
-      brokerPort = server.boundPort()
-      (server, brokerPort)
-    }, new SparkConf(), "KafkaBroker")
-
-    brokerReady = true
-  }
-
-  /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
-  def setup(): Unit = {
-    setupEmbeddedZookeeper()
-    setupEmbeddedKafkaServer()
-  }
-
-  /** Teardown the whole servers, including Kafka broker and Zookeeper */
-  def teardown(): Unit = {
-    brokerReady = false
-    zkReady = false
-
-    if (producer != null) {
-      producer.close()
-      producer = null
-    }
-
-    if (server != null) {
-      server.shutdown()
-      server.awaitShutdown()
-      server = null
-    }
-
-    // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
-    // in some cases. It leads to test failures on Windows if the directory deletion failure
-    // throws an exception.
-    brokerConf.logDirs.foreach { f =>
-      try {
-        Utils.deleteRecursively(new File(f))
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-    }
-
-    if (zkUtils != null) {
-      zkUtils.close()
-      zkUtils = null
-    }
-
-    if (zookeeper != null) {
-      zookeeper.shutdown()
-      zookeeper = null
-    }
-  }
-
-  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
-    // wait until metadata is propagated
-    (0 until partitions).foreach { p =>
-      waitUntilMetadataIsPropagated(topic, p)
-    }
-  }
-
-  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
-  def createTopic(topic: String): Unit = {
-    createTopic(topic, 1)
-  }
-
-  /** Java-friendly function for sending messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
-    sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /** Send the messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
-    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
-    sendMessages(topic, messages)
-  }
-
-  /** Send the array of messages to the Kafka broker */
-  def sendMessages(topic: String, messages: Array[String]): Unit = {
-    producer = new KafkaProducer[String, String](producerConfiguration)
-    messages.foreach { message =>
-      producer.send(new ProducerRecord[String, String](topic, message))
-    }
-    producer.close()
-    producer = null
-  }
-
-  private def brokerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("broker.id", "0")
-    props.put("host.name", "localhost")
-    props.put("port", brokerPort.toString)
-    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
-    props.put("zookeeper.connect", zkAddress)
-    props.put("log.flush.interval.messages", "1")
-    props.put("replica.socket.timeout.ms", "1500")
-    props
-  }
-
-  private def producerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("bootstrap.servers", brokerAddress)
-    props.put("value.serializer", classOf[StringSerializer].getName)
-    // Key serializer is required.
-    props.put("key.serializer", classOf[StringSerializer].getName)
-    // wait for all in-sync replicas to ack sends
-    props.put("acks", "all")
-    props
-  }
-
-  // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
-  // dependency
-  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
-    def makeAttempt(): Either[Throwable, T] = {
-      try {
-        Right(func)
-      } catch {
-        case e if NonFatal(e) => Left(e)
-      }
-    }
-
-    val startTime = System.currentTimeMillis()
-    @tailrec
-    def tryAgain(attempt: Int): T = {
-      makeAttempt() match {
-        case Right(result) => result
-        case Left(e) =>
-          val duration = System.currentTimeMillis() - startTime
-          if (duration < timeout.milliseconds) {
-            Thread.sleep(interval.milliseconds)
-          } else {
-            throw new TimeoutException(e.getMessage)
-          }
-
-          tryAgain(attempt + 1)
-      }
-    }
-
-    tryAgain(1)
-  }
-
-  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
-    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
-      case Some(partitionState) =>
-        val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
-        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
-          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
-          leaderAndInSyncReplicas.isr.nonEmpty
-
-      case _ =>
-        false
-    }
-    eventually(Time(10000), Time(100)) {
-      assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
-    }
-  }
-
-  private class EmbeddedZookeeper(val zkConnect: String) {
-    val snapshotDir = Utils.createTempDir()
-    val logDir = Utils.createTempDir()
-
-    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
-    val (ip, port) = {
-      val splits = zkConnect.split(":")
-      (splits(0), splits(1).toInt)
-    }
-    val factory = new NIOServerCnxnFactory()
-    factory.configure(new InetSocketAddress(ip, port), 16)
-    factory.startup(zookeeper)
-
-    val actualPort = factory.getLocalPort
-
-    def shutdown() {
-      factory.shutdown()
-      // The directories are not closed even if the ZooKeeper server is shut down.
-      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
-      // on Windows if the directory deletion failure throws an exception.
-      try {
-        Utils.deleteRecursively(snapshotDir)
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-      try {
-        Utils.deleteRecursively(logDir)
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/03367d7a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000..6c7024e
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.{File, IOException}
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka010] class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkUtils: ZkUtils = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 0
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: KafkaProducer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkUtils = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+    Option(zkUtils).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      brokerPort = server.boundPort()
+      (server, brokerPort)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server.awaitShutdown()
+      server = null
+    }
+
+    // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+    // in some cases. It leads to test failures on Windows if the directory deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+    }
+
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+  def createTopic(topic: String): Unit = {
+    createTopic(topic, 1)
+  }
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Unit = {
+    producer = new KafkaProducer[String, String](producerConfiguration)
+    messages.foreach { message =>
+      producer.send(new ProducerRecord[String, String](topic, message))
+    }
+    producer.close()
+    producer = null
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("value.serializer", classOf[StringSerializer].getName)
+    // Key serializer is required.
+    props.put("key.serializer", classOf[StringSerializer].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all")
+    props
+  }
+
+  // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+  // dependency
+  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+    def makeAttempt(): Either[Throwable, T] = {
+      try {
+        Right(func)
+      } catch {
+        case e if NonFatal(e) => Left(e)
+      }
+    }
+
+    val startTime = System.currentTimeMillis()
+    @tailrec
+    def tryAgain(attempt: Int): T = {
+      makeAttempt() match {
+        case Right(result) => result
+        case Left(e) =>
+          val duration = System.currentTimeMillis() - startTime
+          if (duration < timeout.milliseconds) {
+            Thread.sleep(interval.milliseconds)
+          } else {
+            throw new TimeoutException(e.getMessage)
+          }
+
+          tryAgain(attempt + 1)
+      }
+    }
+
+    tryAgain(1)
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.nonEmpty
+
+      case _ =>
+        false
+    }
+    eventually(Time(10000), Time(100)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      // The directories are not closed even if the ZooKeeper server is shut down.
+      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+      // on Windows if the directory deletion failure throws an exception.
+      try {
+        Utils.deleteRecursively(snapshotDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+      try {
+        Utils.deleteRecursively(logDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org