You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/26 00:12:36 UTC

kafka git commit: KAFKA-3100; Broker.createBroker should work if json is version > 2 and still compatible

Repository: kafka
Updated Branches:
  refs/heads/trunk 9f21837e9 -> 30d3cc631


KAFKA-3100; Broker.createBroker should work if json is version > 2 and still compatible

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Grant Henke <gr...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #773 from ijuma/kafka-3100-create-broker-version-check


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30d3cc63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30d3cc63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30d3cc63

Branch: refs/heads/trunk
Commit: 30d3cc6314bf896ea37d01c5a1d6b21d69a7053f
Parents: 9f21837
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Jan 25 15:12:37 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 25 15:12:37 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Broker.scala  | 27 +++++++++++-------
 .../unit/kafka/cluster/BrokerEndPointTest.scala | 30 ++++++++++++++++----
 2 files changed, 40 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 42b76cd..b56cae9 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -52,32 +52,37 @@ object Broker {
    *                "SSL://host1:9093"]
    */
   def createBroker(id: Int, brokerInfoString: String): Broker = {
-    if(brokerInfoString == null)
-      throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
+    if (brokerInfoString == null)
+      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
     try {
       Json.parseFull(brokerInfoString) match {
         case Some(m) =>
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
           val version = brokerInfo("version").asInstanceOf[Int]
-          val endpoints = version match {
-            case 1 =>
+          val endpoints =
+            if (version < 1)
+              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
+            else if (version == 1) {
               val host = brokerInfo("host").asInstanceOf[String]
               val port = brokerInfo("port").asInstanceOf[Int]
               Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
-            case 2 =>
+            }
+            else {
               val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
-              listeners.map(listener => {
+              listeners.map { listener =>
                 val ep = EndPoint.createEndPoint(listener)
                 (ep.protocolType, ep)
-              }).toMap
-            case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString)
-          }
+              }.toMap
+            }
+
+
           new Broker(id, endpoints)
         case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
+          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
       }
     } catch {
-      case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+      case t: Throwable =>
+        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/30d3cc63/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 7b8bf4b..905612c 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
 class BrokerEndPointTest extends Logging {
 
   @Test
-  def testSerDe() = {
+  def testSerDe() {
 
     val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
     val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
@@ -42,7 +42,7 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testHashAndEquals() =  {
+  def testHashAndEquals() {
     val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
     val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
     val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT)
@@ -65,7 +65,25 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testFromJSON() = {
+  def testFromJsonFutureVersion() {
+    // `createBroker` should support future compatible versions, we use a hypothetical future version here
+    val brokerInfoStr = """{
+      "foo":"bar",
+      "version":100,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"1416974968782",
+      "endpoints":["SSL://localhost:9093"]
+    }"""
+    val broker = Broker.createBroker(1, brokerInfoStr)
+    assert(broker.id == 1)
+    assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).host == "localhost")
+    assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).port == 9093)
+  }
+
+  @Test
+  def testFromJsonV2 {
     val brokerInfoStr = "{\"version\":2," +
                           "\"host\":\"localhost\"," +
                           "\"port\":9092," +
@@ -79,7 +97,7 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testFromOldJSON() = {
+  def testFromJsonV1() = {
     val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}"
     val broker = Broker.createBroker(1, brokerInfoStr)
     assert(broker.id == 1)
@@ -88,7 +106,7 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testBrokerEndpointFromURI() = {
+  def testBrokerEndpointFromUri() {
     var connectionString = "localhost:9092"
     var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
     assert(endpoint.host == "localhost")
@@ -106,7 +124,7 @@ class BrokerEndPointTest extends Logging {
   }
 
   @Test
-  def testEndpointFromURI() = {
+  def testEndpointFromUri() {
     var connectionString = "PLAINTEXT://localhost:9092"
     var endpoint = EndPoint.createEndPoint(connectionString)
     assert(endpoint.host == "localhost")