You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/25 02:19:02 UTC
git commit: [SPARK-3615][Streaming]Fix Kafka unit test hard coded
Zookeeper port issue
Repository: spark
Updated Branches:
refs/heads/master bb96012b7 -> 74fb2ecf7
[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue
Details can be seen in [SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615).
Author: jerryshao <sa...@intel.com>
Closes #2483 from jerryshao/SPARK_3615 and squashes the following commits:
8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74fb2ecf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74fb2ecf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74fb2ecf
Branch: refs/heads/master
Commit: 74fb2ecf7afc2d314f6477f8f2e6134614387453
Parents: bb96012
Author: jerryshao <sa...@intel.com>
Authored: Wed Sep 24 17:18:55 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Sep 24 17:18:55 2014 -0700
----------------------------------------------------------------------
.../streaming/kafka/JavaKafkaStreamSuite.java | 2 +-
.../streaming/kafka/KafkaStreamSuite.scala | 46 ++++++++++++++------
2 files changed, 34 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 0571454..efb0099 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -81,7 +81,7 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
Predef.<Tuple2<String, Object>>conforms()));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
+ kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
http://git-wip-us.apache.org/repos/asf/spark/blob/74fb2ecf/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index c0b55e9..6943326 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import kafka.admin.CreateTopicCommand
-import kafka.common.TopicAndPartition
+import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
class KafkaStreamSuite extends TestSuiteBase {
import KafkaTestUtils._
- val zkConnect = "localhost:2181"
+ val zkHost = "localhost"
+ var zkPort: Int = 0
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
- val brokerPort = 9092
- val brokerProps = getBrokerConfig(brokerPort, zkConnect)
- val brokerConf = new KafkaConfig(brokerProps)
-
+ protected var brokerPort = 9092
+ protected var brokerConf: KafkaConfig = _
protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
override def beforeFunction() {
// Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(zkConnect)
+ zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ // Get the actual zookeeper binding port
+ zkPort = zookeeper.actualPort
logInfo("==================== 0 ====================")
- zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
+
+ zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ ZKStringSerializer)
logInfo("==================== 1 ====================")
// Kafka broker startup
- server = new KafkaServer(brokerConf)
- logInfo("==================== 2 ====================")
- server.startup()
- logInfo("==================== 3 ====================")
+ var bindSuccess: Boolean = false
+ while(!bindSuccess) {
+ try {
+ val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+ brokerConf = new KafkaConfig(brokerProps)
+ server = new KafkaServer(brokerConf)
+ logInfo("==================== 2 ====================")
+ server.startup()
+ logInfo("==================== 3 ====================")
+ bindSuccess = true
+ } catch {
+ case e: KafkaException =>
+ if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
+ brokerPort += 1
+ }
+ case e: Exception => throw new Exception("Kafka server create failed", e)
+ }
+ }
+
Thread.sleep(2000)
logInfo("==================== 4 ====================")
super.beforeFunction()
@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
createTopic(topic)
produceAndSendMessage(topic, sent)
- val kafkaParams = Map("zookeeper.connect" -> zkConnect,
+ val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")
@@ -200,6 +218,8 @@ object KafkaTestUtils {
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)
+ val actualPort = factory.getLocalPort
+
def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org