You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/09/19 00:54:06 UTC
git commit: kafka-1123; Broker IPv6 addresses parsed incorrectly; patched by Krzysztof Szafrański; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk c892c08df -> be5edd2f8
kafka-1123; Broker IPv6 addresses parsed incorrectly; patched by Krzysztof Szafrański; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be5edd2f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be5edd2f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be5edd2f
Branch: refs/heads/trunk
Commit: be5edd2f8d0c0355cb33feb2ac7482b7df7dccbc
Parents: c892c08
Author: Krzysztof Szafrański <k....@gmail.com>
Authored: Thu Sep 18 15:53:48 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Sep 18 15:53:48 2014 -0700
----------------------------------------------------------------------
.../apache/kafka/common/utils/ClientUtils.java | 17 ++++---
.../org/apache/kafka/common/utils/Utils.java | 36 ++++++++++++++
.../kafka/common/utils/ClientUtilsTest.java | 42 ++++++++++++++++
.../apache/kafka/common/utils/UtilsTest.java | 51 ++++++++++++++++++++
.../main/java/kafka/etl/impl/DataGenerator.java | 7 +--
.../main/scala/kafka/admin/TopicCommand.scala | 4 +-
.../main/scala/kafka/api/TopicMetadata.scala | 4 +-
.../main/scala/kafka/client/ClientUtils.scala | 14 ++----
core/src/main/scala/kafka/cluster/Broker.scala | 7 +--
.../scala/kafka/consumer/SimpleConsumer.scala | 3 +-
.../scala/kafka/producer/SyncProducer.scala | 8 +--
core/src/main/scala/kafka/utils/Utils.scala | 16 ++----
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-
.../test/scala/unit/kafka/utils/UtilsTest.scala | 9 ++--
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 5 +-
15 files changed, 176 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
index cb33e34..b987e7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -19,26 +19,31 @@ import java.util.List;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
public class ClientUtils {
+
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (String url : urls) {
if (url != null && url.length() > 0) {
- String[] pieces = url.split(":");
- if (pieces.length != 2)
+ String host = getHost(url);
+ Integer port = getPort(url);
+ if (host == null || port == null)
throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
try {
- InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+ InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved())
- throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+ throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
addresses.add(address);
} catch (NumberFormatException e) {
- throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+ throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
}
}
}
if (addresses.size() < 1)
- throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+ throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
return addresses;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 50af601..a0827f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -14,11 +14,15 @@ package org.apache.kafka.common.utils;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.kafka.common.KafkaException;
public class Utils {
+ private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
+
public static String NL = System.getProperty("line.separator");
/**
@@ -217,4 +221,36 @@ public class Utils {
return h;
}
+ /**
+ * Extracts the hostname from a "host:port" address string.
+ * @param address address string to parse
+ * @return hostname or null if the given address is incorrect
+ */
+ public static String getHost(String address) {
+ Matcher matcher = HOST_PORT_PATTERN.matcher(address);
+ return matcher.matches() ? matcher.group(1) : null;
+ }
+
+ /**
+ * Extracts the port number from a "host:port" address string.
+ * @param address address string to parse
+ * @return port number or null if the given address is incorrect
+ */
+ public static Integer getPort(String address) {
+ Matcher matcher = HOST_PORT_PATTERN.matcher(address);
+ return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null;
+ }
+
+ /**
+ * Formats hostname and port number as a "host:port" address string,
+ * surrounding IPv6 addresses with braces '[', ']'
+ * @param host hostname
+ * @param port port number
+ * @return address string
+ */
+ public static String formatAddress(String host, Integer port) {
+ return host.contains(":")
+ ? "[" + host + "]:" + port // IPv6
+ : host + ":" + port;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
new file mode 100644
index 0000000..6e37ea5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class ClientUtilsTest {
+
+ @Test
+ public void testParseAndValidateAddresses() {
+ check("127.0.0.1:8000");
+ check("mydomain.com:8080");
+ check("[::1]:8000");
+ check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testNoPort() {
+ check("127.0.0.1");
+ }
+
+ private void check(String... url) {
+ ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
new file mode 100644
index 0000000..a39fab5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.formatAddress;
+import static org.junit.Assert.*;
+
+public class UtilsTest {
+
+ @Test
+ public void testGetHost() {
+ assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
+ assertEquals("mydomain.com", getHost("mydomain.com:8080"));
+ assertEquals("::1", getHost("[::1]:1234"));
+ assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
+ }
+
+ @Test
+ public void testGetPort() {
+ assertEquals(8000, getPort("127.0.0.1:8000").intValue());
+ assertEquals(8080, getPort("mydomain.com:8080").intValue());
+ assertEquals(1234, getPort("[::1]:1234").intValue());
+ assertEquals(5678, getPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678").intValue());
+ }
+
+ @Test
+ public void testFormatAddress() {
+ assertEquals("127.0.0.1:8000", formatAddress("127.0.0.1", 8000));
+ assertEquals("mydomain.com:8080", formatAddress("mydomain.com", 8080));
+ assertEquals("[::1]:1234", formatAddress("::1", 1234));
+ assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index f3fb3fd..d27a511 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -27,7 +27,6 @@ import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,8 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
+import static org.apache.kafka.common.utils.Utils.formatAddress;
+
/**
* Use this class to produce test events to Kafka server. Each event contains a
* random timestamp in text format.
@@ -70,7 +71,7 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
- producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
+ producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort()));
producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
@@ -108,7 +109,7 @@ public class DataGenerator {
if (fs.exists(outPath)) fs.delete(outPath);
KafkaETLRequest request =
- new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
+ new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()), 0);
System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
byte[] bytes = request.toString().getBytes("UTF-8");
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index b3f2e82..3b2166a 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
import joptsimple._
import java.util.Properties
-import kafka.admin.AdminOperationException
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -29,6 +28,7 @@ import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.OffsetManager
+import org.apache.kafka.common.utils.Utils.formatAddress
object TopicCommand {
@@ -193,7 +193,7 @@ object TopicCommand {
}
}
- def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+ def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 51380a6..0190076 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -21,8 +21,8 @@ import kafka.cluster.Broker
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
-import collection.mutable.ArrayBuffer
import kafka.common._
+import org.apache.kafka.common.utils.Utils._
object TopicMetadata {
@@ -149,7 +149,7 @@ case class PartitionMetadata(partitionId: Int,
partitionMetadataString.toString()
}
- private def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+ private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ce7ede3..ebba87f 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -28,6 +28,7 @@ import util.Random
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.ZkClient
import java.io.IOException
+import org.apache.kafka.common.utils.Utils.{getHost, getPort}
/**
* Helper functions common to clients (producer, consumer, or admin)
@@ -85,7 +86,7 @@ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
correlationId: Int = 0): TopicMetadataResponse = {
val props = new Properties()
- props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
+ props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
props.put("client.id", clientId)
props.put("request.timeout.ms", timeoutMs.toString)
val producerConfig = new ProducerConfig(props)
@@ -98,14 +99,9 @@ object ClientUtils extends Logging{
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
- brokersStr.zipWithIndex.map(b =>{
- val brokerStr = b._1
- val brokerId = b._2
- val brokerInfos = brokerStr.split(":")
- val hostName = brokerInfos(0)
- val port = brokerInfos(1).toInt
- new Broker(brokerId, hostName, port)
- })
+ brokersStr.zipWithIndex.map { case (address, brokerId) =>
+ new Broker(brokerId, getHost(address), getPort(address))
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index ccc3fc1..0060add 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -22,6 +22,7 @@ import kafka.utils.Json
import kafka.api.ApiUtils._
import java.nio.ByteBuffer
import kafka.common.{KafkaException, BrokerNotAvailableException}
+import org.apache.kafka.common.utils.Utils._
/**
* A Kafka broker
@@ -54,11 +55,11 @@ object Broker {
}
}
-case class Broker(val id: Int, val host: String, val port: Int) {
+case class Broker(id: Int, host: String, port: Int) {
- override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
+ override def toString: String = "id:" + id + ",host:" + host + ",port:" + port
- def getConnectionString(): String = host + ":" + port
+ def connectionString: String = formatAddress(host, port)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 8db9203..d349a30 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -21,6 +21,7 @@ import kafka.api._
import kafka.network._
import kafka.utils._
import kafka.common.{ErrorMapping, TopicAndPartition}
+import org.apache.kafka.common.utils.Utils._
/**
* A consumer of kafka messages
@@ -46,7 +47,7 @@ class SimpleConsumer(val host: String,
}
private def disconnect() = {
- debug("Disconnecting from " + host + ":" + port)
+ debug("Disconnecting from " + formatAddress(host, port))
blockingChannel.disconnect()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 489f007..42c9503 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -22,6 +22,8 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
+import org.apache.kafka.common.utils.Utils._
+
object SyncProducer {
val RequestKey: Short = 0
val randomGenerator = new Random
@@ -126,7 +128,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
private def disconnect() {
try {
- info("Disconnecting from " + config.host + ":" + config.port)
+ info("Disconnecting from " + formatAddress(config.host, config.port))
blockingChannel.disconnect()
} catch {
case e: Exception => error("Error on disconnect: ", e)
@@ -137,11 +139,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
if (!blockingChannel.isConnected && !shutdown) {
try {
blockingChannel.connect()
- info("Connected to " + config.host + ":" + config.port + " for producing")
+ info("Connected to " + formatAddress(config.host, config.port) + " for producing")
} catch {
case e: Exception => {
disconnect()
- error("Producer connection to " + config.host + ":" + config.port + " unsuccessful", e)
+ error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
throw e
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index da52b42..29d5a17 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -395,21 +395,13 @@ object Utils extends Logging {
}
/**
- * Parse a host and port out of a string
- */
- def parseHostPort(hostport: String) : (String, Int) = {
- val splits = hostport.split(":")
- (splits(0), splits(1).toInt)
- }
-
- /**
* Get the stack trace from an exception as a string
*/
def stackTrace(e: Throwable): String = {
- val sw = new StringWriter;
- val pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- sw.toString();
+ val sw = new StringWriter
+ val pw = new PrintWriter(sw)
+ e.printStackTrace(pw)
+ sw.toString()
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c4e13c5..2dbdd3c 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,8 @@ import java.nio.channels._
import java.util.Random
import java.util.Properties
+import org.apache.kafka.common.utils.Utils._
+
import collection.mutable.Map
import collection.mutable.ListBuffer
@@ -142,7 +144,7 @@ object TestUtils extends Logging {
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
- configs.map(c => c.hostName + ":" + c.port).mkString(",")
+ configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index a502349..0d0f0e2 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -20,7 +20,6 @@ package kafka.utils
import java.util.Arrays
import java.util.concurrent.locks.ReentrantLock
import java.nio.ByteBuffer
-import java.io._
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
@@ -73,7 +72,7 @@ class UtilsTest extends JUnitSuite {
assertEquals(1, Utils.abs(1))
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
}
-
+
@Test
def testReplaceSuffix() {
assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
@@ -81,7 +80,7 @@ class UtilsTest extends JUnitSuite {
assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
}
-
+
@Test
def testReadInt() {
val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
@@ -90,7 +89,6 @@ class UtilsTest extends JUnitSuite {
buffer.putInt(i*4, values(i))
assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
}
-
}
@Test
@@ -105,7 +103,7 @@ class UtilsTest extends JUnitSuite {
assertTrue(emptyStringList.equals(emptyListFromNullString))
assertTrue(emptyStringList.equals(emptyList))
}
-
+
@Test
def testInLock() {
val lock = new ReentrantLock()
@@ -115,6 +113,5 @@ class UtilsTest extends JUnitSuite {
}
assertEquals(2, result)
assertFalse("Should be unlocked", lock.isLocked)
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 3021a8c..3151561 100644
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -18,20 +18,19 @@
package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
-import org.apache.zookeeper.server.NIOServerCnxn
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
- val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxnFactory()
- factory.configure(new InetSocketAddress("127.0.0.1", port),0)
+ factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
factory.startup(zookeeper)
def shutdown() {