You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/26 00:12:36 UTC
kafka git commit: KAFKA-3100;
Broker.createBroker should work if json is version > 2 and still
compatible
Repository: kafka
Updated Branches:
refs/heads/trunk 9f21837e9 -> 30d3cc631
KAFKA-3100; Broker.createBroker should work if json is version > 2 and still compatible
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #773 from ijuma/kafka-3100-create-broker-version-check
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30d3cc63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30d3cc63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30d3cc63
Branch: refs/heads/trunk
Commit: 30d3cc6314bf896ea37d01c5a1d6b21d69a7053f
Parents: 9f21837
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Jan 25 15:12:37 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 25 15:12:37 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Broker.scala | 27 +++++++++++-------
.../unit/kafka/cluster/BrokerEndPointTest.scala | 30 ++++++++++++++++----
2 files changed, 40 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 42b76cd..b56cae9 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -52,32 +52,37 @@ object Broker {
* "SSL://host1:9093"]
*/
def createBroker(id: Int, brokerInfoString: String): Broker = {
- if(brokerInfoString == null)
- throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
+ if (brokerInfoString == null)
+ throw new BrokerNotAvailableException(s"Broker id $id does not exist")
try {
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val version = brokerInfo("version").asInstanceOf[Int]
- val endpoints = version match {
- case 1 =>
+ val endpoints =
+ if (version < 1)
+ throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
+ else if (version == 1) {
val host = brokerInfo("host").asInstanceOf[String]
val port = brokerInfo("port").asInstanceOf[Int]
Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
- case 2 =>
+ }
+ else {
val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
- listeners.map(listener => {
+ listeners.map { listener =>
val ep = EndPoint.createEndPoint(listener)
(ep.protocolType, ep)
- }).toMap
- case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString)
- }
+ }.toMap
+ }
+
+
new Broker(id, endpoints)
case None =>
- throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
+ throw new BrokerNotAvailableException(s"Broker id $id does not exist")
}
} catch {
- case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+ case t: Throwable =>
+ throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 7b8bf4b..905612c 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
class BrokerEndPointTest extends Logging {
@Test
- def testSerDe() = {
+ def testSerDe() {
val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
@@ -42,7 +42,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testHashAndEquals() = {
+ def testHashAndEquals() {
val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT)
@@ -65,7 +65,25 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testFromJSON() = {
+ def testFromJsonFutureVersion() {
+ // `createBroker` should support future compatible versions, we use a hypothetical future version here
+ val brokerInfoStr = """{
+ "foo":"bar",
+ "version":100,
+ "host":"localhost",
+ "port":9092,
+ "jmx_port":9999,
+ "timestamp":"1416974968782",
+ "endpoints":["SSL://localhost:9093"]
+ }"""
+ val broker = Broker.createBroker(1, brokerInfoStr)
+ assert(broker.id == 1)
+ assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).host == "localhost")
+ assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).port == 9093)
+ }
+
+ @Test
+ def testFromJsonV2 {
val brokerInfoStr = "{\"version\":2," +
"\"host\":\"localhost\"," +
"\"port\":9092," +
@@ -79,7 +97,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testFromOldJSON() = {
+ def testFromJsonV1() = {
val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}"
val broker = Broker.createBroker(1, brokerInfoStr)
assert(broker.id == 1)
@@ -88,7 +106,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testBrokerEndpointFromURI() = {
+ def testBrokerEndpointFromUri() {
var connectionString = "localhost:9092"
var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
assert(endpoint.host == "localhost")
@@ -106,7 +124,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
- def testEndpointFromURI() = {
+ def testEndpointFromUri() {
var connectionString = "PLAINTEXT://localhost:9092"
var endpoint = EndPoint.createEndPoint(connectionString)
assert(endpoint.host == "localhost")