You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/25 02:19:02 UTC

git commit: [SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue

Repository: spark
Updated Branches:
  refs/heads/master bb96012b7 -> 74fb2ecf7


[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue

Details can be seen in [SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615).

Author: jerryshao <sa...@intel.com>

Closes #2483 from jerryshao/SPARK_3615 and squashes the following commits:

8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74fb2ecf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74fb2ecf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74fb2ecf

Branch: refs/heads/master
Commit: 74fb2ecf7afc2d314f6477f8f2e6134614387453
Parents: bb96012
Author: jerryshao <sa...@intel.com>
Authored: Wed Sep 24 17:18:55 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Sep 24 17:18:55 2014 -0700

----------------------------------------------------------------------
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  2 +-
 .../streaming/kafka/KafkaStreamSuite.scala      | 46 ++++++++++++++------
 2 files changed, 34 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 0571454..efb0099 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -81,7 +81,7 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
         Predef.<Tuple2<String, Object>>conforms()));
 
     HashMap<String, String> kafkaParams = new HashMap<String, String>();
-    kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
+    kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
     kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
     kafkaParams.put("auto.offset.reset", "smallest");
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index c0b55e9..6943326 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
 import scala.collection.mutable
 
 import kafka.admin.CreateTopicCommand
-import kafka.common.TopicAndPartition
+import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import kafka.utils.ZKStringSerializer
 import kafka.serializer.{StringDecoder, StringEncoder}
@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
 class KafkaStreamSuite extends TestSuiteBase {
   import KafkaTestUtils._
 
-  val zkConnect = "localhost:2181"
+  val zkHost = "localhost"
+  var zkPort: Int = 0
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
 
-  val brokerPort = 9092
-  val brokerProps = getBrokerConfig(brokerPort, zkConnect)
-  val brokerConf = new KafkaConfig(brokerProps)
-
+  protected var brokerPort = 9092
+  protected var brokerConf: KafkaConfig = _
   protected var zookeeper: EmbeddedZookeeper = _
   protected var zkClient: ZkClient = _
   protected var server: KafkaServer = _
@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
 
   override def beforeFunction() {
     // Zookeeper server startup
-    zookeeper = new EmbeddedZookeeper(zkConnect)
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
     logInfo("==================== 0 ====================")
-    zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+
+    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+      ZKStringSerializer)
     logInfo("==================== 1 ====================")
 
     // Kafka broker startup
-    server = new KafkaServer(brokerConf)
-    logInfo("==================== 2 ====================")
-    server.startup()
-    logInfo("==================== 3 ====================")
+    var bindSuccess: Boolean = false
+    while(!bindSuccess) {
+      try {
+        val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+        brokerConf = new KafkaConfig(brokerProps)
+        server = new KafkaServer(brokerConf)
+        logInfo("==================== 2 ====================")
+        server.startup()
+        logInfo("==================== 3 ====================")
+        bindSuccess = true
+      } catch {
+        case e: KafkaException =>
+          if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
+            brokerPort += 1
+          }
+        case e: Exception => throw new Exception("Kafka server create failed", e)
+      }
+    }
+
     Thread.sleep(2000)
     logInfo("==================== 4 ====================")
     super.beforeFunction()
@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
     createTopic(topic)
     produceAndSendMessage(topic, sent)
 
-    val kafkaParams = Map("zookeeper.connect" -> zkConnect,
+    val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
       "group.id" -> s"test-consumer-${random.nextInt(10000)}",
       "auto.offset.reset" -> "smallest")
 
@@ -200,6 +218,8 @@ object KafkaTestUtils {
     factory.configure(new InetSocketAddress(ip, port), 16)
     factory.startup(zookeeper)
 
+    val actualPort = factory.getLocalPort
+
     def shutdown() {
       factory.shutdown()
       Utils.deleteRecursively(snapshotDir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org