You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/27 01:47:30 UTC

kafka git commit: KAFKA-3132: URI scheme in "listeners" property should not be case-sen…

Repository: kafka
Updated Branches:
  refs/heads/trunk 1388ed9ba -> e9a72ceab


KAFKA-3132: URI scheme in "listeners" property should not be case-sen…

…sitive

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma

Closes #811 from granthenke/listeners-case


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e9a72cea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e9a72cea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e9a72cea

Branch: refs/heads/trunk
Commit: e9a72ceab6e0ceaf4d2125756f07154cd15a7178
Parents: 1388ed9
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Jan 26 16:47:26 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Jan 26 16:47:26 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/clients/ClientUtils.java   |  2 +-
 .../apache/kafka/common/protocol/SecurityProtocol.java    |  5 +++++
 .../java/org/apache/kafka/common/network/EchoServer.java  |  2 +-
 core/src/main/scala/kafka/cluster/EndPoint.scala          |  4 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala        |  2 +-
 .../test/scala/unit/kafka/server/KafkaConfigTest.scala    | 10 ++++++++++
 6 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index b614198..0201257 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -73,7 +73,7 @@ public class ClientUtils {
      * @return configured ChannelBuilder based on the configs.
      */
     public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
-        SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+        SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
         if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
             throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
         return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index cbd0c42..905c670 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -83,6 +83,11 @@ public enum SecurityProtocol {
         return CODE_TO_SECURITY_PROTOCOL.get(id);
     }
 
+    /** Case insensitive lookup by protocol name */
+    public static SecurityProtocol forName(String name) {
+        return SecurityProtocol.valueOf(name.toUpperCase());
+    }
+
     /**
      * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable
      * for production usage.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index 9354bfe..44b5a5f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -43,7 +43,7 @@ class EchoServer extends Thread {
 
     public EchoServer(Map<String, ?> configs) throws Exception {
         this.protocol =  configs.containsKey("security.protocol") ?
-            SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
+            SecurityProtocol.forName((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
         if (protocol == SecurityProtocol.SSL) {
             this.sslFactory = new SslFactory(Mode.SERVER);
             this.sslFactory.configure(configs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 76997b5..32c27ed 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -44,8 +44,8 @@ object EndPoint {
   def createEndPoint(connectionString: String): EndPoint = {
     val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-.:]*)\]?:(-?[0-9]+)""".r
     connectionString match {
-      case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.valueOf(protocol))
-      case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.valueOf(protocol))
+      case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.forName(protocol))
+      case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.forName(protocol))
       case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4911809..00bf0cb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -800,7 +800,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
   val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
-  val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
+  val interBrokerSecurityProtocol = SecurityProtocol.forName(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
   val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
 
   /** ********* Controlled shutdown configuration ***********/

http://git-wip-us.apache.org/repos/asf/kafka/blob/e9a72cea/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9ddc2c1..e8ffb5b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -235,6 +235,16 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testCaseInsensitiveListenerProtocol() {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
+
+    assert(isValidKafkaConfig(props))
+  }
+
+  @Test
   def testListenerDefaults() {
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, "1")