You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2015/05/08 23:46:47 UTC

ambari git commit: AMBARI-10984. Ambari support for kerberos kafka. (Sriharsha Chintalapani via Jaimin)

Repository: ambari
Updated Branches:
  refs/heads/trunk 562cb1270 -> 754692354


AMBARI-10984. Ambari support for kerberos kafka. (Sriharsha Chintalapani via Jaimin)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75469235
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75469235
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75469235

Branch: refs/heads/trunk
Commit: 75469235478edbf1e83377c341ad67e98aeade8a
Parents: 562cb12
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Fri May 8 14:46:26 2015 -0700
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Fri May 8 14:46:26 2015 -0700

----------------------------------------------------------------------
 .../KAFKA/0.8.1.2.2/configuration/kafka-env.xml |   2 +-
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    |  30 ++++-
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   |  20 ++-
 .../KAFKA/0.8.1.2.2/package/scripts/utils.py    |  38 ++++++
 .../package/templates/kafka_jaas.conf.j2        |  41 ++++++
 .../KAFKA/configuration/kafka-broker.xml        | 127 +++++++++++++++++++
 .../stacks/HDP/2.3/services/KAFKA/kerberos.json |  49 +++++++
 .../stacks/HDP/2.3/services/KAFKA/metainfo.xml  |   3 +
 .../stacks/2.2/KAFKA/test_kafka_broker.py       |   4 +-
 ambari-web/app/data/HDP2/site_properties.js     |  14 ++
 10 files changed, 316 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
index 98814aa..ad7689e 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
@@ -52,7 +52,7 @@ export JAVA_HOME={{java64_home}}
 export PATH=$PATH:$JAVA_HOME/bin
 export PID_DIR={{kafka_pid_dir}}
 export LOG_DIR={{kafka_log_dir}}
-
+export KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}
 # Add kafka sink to classpath and related depenencies
 if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then
   export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 6668c4e..0957215 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -20,6 +20,7 @@ limitations under the License.
 
 from resource_management import *
 from properties_config import properties_config
+from resource_management.libraries.resources.template_config import TemplateConfig
 import sys, os
 from copy import deepcopy
 
@@ -36,7 +37,23 @@ def kafka():
     brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     kafka_server_config['broker.id'] = brokerid
-    kafka_server_config['host.name'] = params.hostname
+
+    #listeners and advertised.listeners are only added in 2.3.0.0 onwards.
+    if params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.3.0.0') >= 0:
+        if params.security_enabled and params.kafka_kerberos_enabled:
+            listeners = kafka_server_config['listeners'].replace("localhost", params.hostname).replace("PLAINTEXT", "PLAINTEXTSASL")
+            kafka_server_config['listeners'] = listeners
+            kafka_server_config['advertised.listeners'] = listeners
+        else:
+            listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
+            kafka_server_config['listeners'] = listeners
+            if 'advertised.listeners' in kafka_server_config:
+                advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+                kafka_server_config['advertised.listeners'] = advertised_listeners
+    else:
+        kafka_server_config['host.name'] = params.hostname
+
+
     kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters
     if(params.has_metric_collector):
             kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
@@ -71,6 +88,11 @@ def kafka():
              content=params.log4j_props
          )
 
+    if params.security_enabled and params.kafka_kerberos_enabled:
+        TemplateConfig(format("{conf_dir}/kafka_jaas.conf"),
+                         owner=params.kafka_user)
+
+
     setup_symlink(params.kafka_managed_pid_dir, params.kafka_pid_dir)
     setup_symlink(params.kafka_managed_log_dir, params.kafka_log_dir)
 
@@ -113,8 +135,7 @@ def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
               cd_access='a',
               owner=params.kafka_user,
               group=params.user_group,
-              recursive=True
-    )
+              recursive=True)
 
   if backup_folder_path:
     # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path;
@@ -147,6 +168,3 @@ def backup_dir_contents(dir_path, backup_folder_suffix):
          content = StaticFile(os.path.join(dir_path,file)))
 
   return backup_destination_path
-
-
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index fcb0092..355bb47 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -19,11 +19,13 @@ limitations under the License.
 """
 from resource_management.libraries.functions import format
 from resource_management.libraries.script.script import Script
-from resource_management.libraries.functions.version import format_hdp_stack_version
+from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
 from resource_management.libraries.functions.default import default
+from utils import get_bare_principal
 
 import status_params
 
+
 # server configurations
 config = Script.get_config()
 tmp_dir = Script.get_tmp_dir()
@@ -44,14 +46,13 @@ conf_dir = "/etc/kafka/conf"
 if Script.is_hdp_stack_greater_or_equal("2.2"):
   kafka_home = '/usr/hdp/current/kafka-broker/'
   kafka_bin = kafka_home+'bin/kafka'
-  conf_dir = "/usr/hdp/current/kafka-broker/conf"
+  conf_dir = "/usr/hdp/current/kafka-broker/config"
 
 
 kafka_user = config['configurations']['kafka-env']['kafka_user']
 kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
 kafka_pid_dir = status_params.kafka_pid_dir
 kafka_pid_file = kafka_pid_dir+"/kafka.pid"
-
 # This is hardcoded on the kafka bash process lifecycle on which we have no control over
 kafka_managed_pid_dir = "/var/run/kafka"
 kafka_managed_log_dir = "/var/log/kafka"
@@ -102,3 +103,16 @@ if has_metric_collector:
 
 # Security-related params
 security_enabled = config['configurations']['cluster-env']['security_enabled']
+kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker'] and
+                          config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL")
+
+print kafka_kerberos_enabled
+if security_enabled and hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.3') >= 0:
+    _hostname_lowercase = config['hostname'].lower()
+    _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
+    kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
+    kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
+    kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+    kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf"
+else:
+    kafka_kerberos_params = ''

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/utils.py
new file mode 100644
index 0000000..2f1fa5e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+def get_bare_principal(normalized_principal_name):
+    """
+    Given a normalized principal name (nimbus/c6501.ambari.apache.org@EXAMPLE.COM) returns just the
+    primary component (nimbus)
+    :param normalized_principal_name: a string containing the principal name to process
+    :return: a string containing the primary component value or None if not valid
+    """
+
+    bare_principal = None
+
+    if normalized_principal_name:
+        match = re.match(r"([^/@]+)(?:/[^@])?(?:@.*)?", normalized_principal_name)
+
+    if match:
+        bare_principal = match.group(1)
+
+    return bare_principal

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/templates/kafka_jaas.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/templates/kafka_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/templates/kafka_jaas.conf.j2
new file mode 100644
index 0000000..56c558d
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/templates/kafka_jaas.conf.j2
@@ -0,0 +1,41 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaServer {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="{{kafka_bare_jaas_principal}}"
+   principal="{{kafka_jaas_principal}}";
+};
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="zookeeper"
+   principal="{{kafka_jaas_principal}}";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/configuration/kafka-broker.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/configuration/kafka-broker.xml
new file mode 100644
index 0000000..e621ddf
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/configuration/kafka-broker.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0"?>
+<!--
+ censed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+
+<configuration supports_final="true" xmlns:xi="http://www.w3.org/2001/XInclude">
+  <property>
+    <name>listeners</name>
+    <value>PLAINTEXT://localhost:6667</value>
+    <description>host and port where kafka broker will be accepting connnections. localhost will be subsituted with hostname.</description>
+  </property>
+  <property>
+    <name>controlled.shutdown.enable</name>
+    <value>true</value>
+    <description>Enable controlled shutdown of the broker. If enabled, the broker will move all leaders on it to some other brokers before shutting itself down. This reduces the unavailability window during shutdown.</description>
+  </property>
+  <property>
+    <name>auto.leader.rebalance.enable</name>
+    <value>true</value>
+    <description>Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals</description>
+  </property>
+  <property>
+    <name>num.recovery.threads.per.data.dir</name>
+    <value>1</value>
+    <description>The number of threads per data directory to be used for log recovery at startup and flushing at shutdown</description>
+  </property>
+  <property>
+    <name>min.insync.replicas</name>
+    <value>1</value>
+    <description>define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)</description>
+  </property>
+  <property>
+    <name>leader.imbalance.per.broker.percentage</name>
+    <value>10</value>
+    <description>The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.</description>
+  </property>
+  <property>
+    <name>leader.imbalance.check.interval.seconds</name>
+    <value>300</value>
+    <description>The frequency with which the partition rebalance check is triggered by the controller</description>
+  </property>
+  <property>
+    <name>offset.metadata.max.bytes</name>
+    <value>4096</value>
+    <description>The maximum size for a metadata entry associated with an offset commit</description>
+  </property>
+  <property>
+    <name>offsets.load.buffer.size</name>
+    <value>5242880</value>
+    <description>Batch size for reading from the offsets segments when loading offsets into the cache.</description>
+  </property>
+  <property>
+    <name>offsets.topic.replication.factor</name>
+    <value>3</value>
+    <description>The replication factor for the offsets topic (set higher to ensure availability).
+    To ensure that the effective replication factor of the offsets topic is the configured value,
+    the number of alive brokers has to be at least the replication factor at the time of the
+    first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor).</description>
+  </property>
+  <property>
+    <name>offsets.topic.num.partitions</name>
+    <value>50</value>
+    <description>The number of partitions for the offset commit topic (should not change after deployment)</description>
+  </property>
+  <property>
+    <name>offsets.topic.segment.bytes</name>
+    <value>104857600</value>
+    <description>The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads</description>
+  </property>
+  <property>
+    <name>offsets.topic.compression.codec</name>
+    <value>0</value>
+    <description>Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits. Default is NoCompression. For Gzip add value 1 , SnappyCompression add value 2, LZ4CompressionCodec 3.
+    </description>
+  </property>
+  <property>
+    <name>offsets.retention.minutes</name>
+    <value>86400000</value>
+    <description>Log retention window in minutes for offsets topic</description>
+  </property>
+  <property>
+    <name>offsets.retention.check.interval.ms</name>
+    <value>600000</value>
+    <description>Frequency at which to check for stale offsets</description>
+  </property>
+  <property>
+    <name>offsets.commit.timeout.ms</name>
+    <value>5000</value>
+    <description>Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.</description>
+  </property>
+  <property>
+    <name>offsets.commit.required.acks</name>
+    <value>-1</value>
+    <description>The required acks before the commit can be accepted. In general, the default (-1) should not be overridden</description>
+  </property>
+  <property>
+    <name>delete.topic.enable</name>
+    <value>false</value>
+    <description>Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off</description>
+  </property>
+  <property>
+    <name>compression.type</name>
+    <description>Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.</description>
+    <value>producer</value>
+  </property>
+  <property>
+    <name>port</name>
+    <value>6667</value>
+    <description>Deprecated config in favor of listeners config.</description>
+    <deleted>true</deleted>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/kerberos.json b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/kerberos.json
new file mode 100644
index 0000000..03e8c41
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/kerberos.json
@@ -0,0 +1,49 @@
+{
+  "services": [
+    {
+      "name": "KAFKA",
+      "identities": [
+        {
+          "name": "/smokeuser"
+        },
+      ],
+      "configurations": [
+        {
+          "kafka-broker": {
+              "authorizer.class.name": "kafka.security.auth.SimpleAclAuthorizer",
+              "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
+              "super.users": "user:${kafka-env/kafka_user}",
+              "security.inter.broker.protocol": "PLAINTEXTSASL"
+          }
+        }
+      ],
+      "components": [
+        {
+          "name": "KAFKA_BROKER",
+          "identities": [
+            {
+              "name": "kafka_broker",
+              "principal": {
+                "value": "${kafka-env/kafka_user}/_HOST@${realm}",
+                "type": "service",
+                "configuration": "kafka-env/kafka_principal_name"
+              },
+              "keytab": {
+                "file": "${keytab_dir}/kafka.service.keytab",
+                "owner": {
+                  "name": "${kafka-env/kafka_user}",
+                  "access": "r"
+                },
+                "group": {
+                  "name": "${cluster-env/user_group}",
+                  "access": ""
+                },
+                "configuration": "kafka-env/kafka_keytab"
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/metainfo.xml
index af0e0be..3f7612e 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/metainfo.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/KAFKA/metainfo.xml
@@ -39,6 +39,9 @@
           </packages>
         </osSpecific>
       </osSpecifics>
+      <configuration-dependencies>
+        <config-type>kafka-broker</config-type>
+      </configuration-dependencies>
     </service>
   </services>
 </metainfo>

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
index 58325e6..04e904a 100644
--- a/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
+++ b/ambari-server/src/test/python/stacks/2.2/KAFKA/test_kafka_broker.py
@@ -50,7 +50,7 @@ class TestKafkaBroker(RMFTestCase):
                               cd_access = 'a'
     )
 
-    self.assertResourceCalled('Directory', '/usr/hdp/current/kafka-broker/conf',
+    self.assertResourceCalled('Directory', '/usr/hdp/current/kafka-broker/config',
                               owner = 'kafka',
                               group = 'hadoop',
                               recursive = True,
@@ -86,7 +86,7 @@ class TestKafkaBroker(RMFTestCase):
                               cd_access = 'a'
     )
 
-    self.assertResourceCalled('Directory', '/usr/hdp/current/kafka-broker/conf',
+    self.assertResourceCalled('Directory', '/usr/hdp/current/kafka-broker/config',
                               owner = 'kafka',
                               group = 'hadoop',
                               recursive = True,

http://git-wip-us.apache.org/repos/asf/ambari/blob/75469235/ambari-web/app/data/HDP2/site_properties.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/data/HDP2/site_properties.js b/ambari-web/app/data/HDP2/site_properties.js
index 36d582f..f6fba99 100644
--- a/ambari-web/app/data/HDP2/site_properties.js
+++ b/ambari-web/app/data/HDP2/site_properties.js
@@ -1866,6 +1866,20 @@ var hdp2properties = [
     "category": "KAFKA_BROKER",
     "index": 0
   },
+    {
+    "id": "site property",
+    "name": "listeners",
+    "displayName": "listeners",
+    "value": "",
+    "defaultValue": "",
+    "displayType": "advanced",
+    "serviceName": "KAFKA",
+    "filename": "kafka-broker.xml",
+    "category": "KAFKA_BROKER",
+    "stack": {
+      "HDP": "2.3"
+    }
+  },
   {
     "id": "site property",
     "name": "log.roll.hours",