You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2017/05/05 14:10:04 UTC

[1/2] ambari git commit: AMBARI-20939. HDP 3.0 TP - create service definition for Kafka with configs, kerberos, widgets, etc.(vbrodetskyi)

Repository: ambari
Updated Branches:
  refs/heads/trunk 1aad067cf -> 3817ad5da


http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
new file mode 100644
index 0000000..680dd32
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka.py
@@ -0,0 +1,276 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import collections
+import os
+
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.resources.properties_file import PropertiesFile
+from resource_management.libraries.resources.template_config import TemplateConfig
+from resource_management.core.resources.system import Directory, Execute, File, Link
+from resource_management.core.source import StaticFile, Template, InlineTemplate
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions import Direction
+
+
+from resource_management.core.logger import Logger
+
+
+def kafka(upgrade_type=None):
+    import params
+    ensure_base_directories()
+
+    kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
+    # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
+    # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari doesn't have to.
+
+    effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version)
+    Logger.info(format("Effective stack version: {effective_version}"))
+
+    # In HDP-2.2 (Apache Kafka 0.8.1.1) we used to generate broker.ids based on hosts and add them to
+    # kafka's server.properties. In future version brokers can generate their own ids based on zookeeper seq
+    # We need to preserve the broker.id when user is upgrading from HDP-2.2 to any higher version.
+    # Once its preserved it will be written to kafka.log.dirs/meta.properties and it will be used from there on
+    # similarly we need preserve port as well during the upgrade
+
+    if upgrade_type is not None and params.upgrade_direction == Direction.UPGRADE and \
+      check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, params.current_version) and \
+      check_stack_feature(StackFeature.KAFKA_LISTENERS, params.version):
+      if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
+        brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
+        kafka_server_config['broker.id'] = brokerid
+        Logger.info(format("Calculating broker.id as {brokerid}"))
+      if 'port' in kafka_server_config:
+        port = kafka_server_config['port']
+        Logger.info(format("Port config from previous verson: {port}"))
+        listeners = kafka_server_config['listeners']
+        kafka_server_config['listeners'] = listeners.replace("6667", port)
+        Logger.info(format("Kafka listeners after the port update: {listeners}"))
+        del kafka_server_config['port']
+
+
+    if effective_version is not None and effective_version != "" and \
+      check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version):
+      if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
+        brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
+        kafka_server_config['broker.id'] = brokerid
+        Logger.info(format("Calculating broker.id as {brokerid}"))
+
+    # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
+    if effective_version is not None and effective_version != "" and \
+       check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
+
+       listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
+       Logger.info(format("Kafka listeners: {listeners}"))
+       kafka_server_config['listeners'] = listeners       
+
+       if params.security_enabled and params.kafka_kerberos_enabled:
+         Logger.info("Kafka kerberos security is enabled.")
+         kafka_server_config['advertised.listeners'] = listeners
+         Logger.info(format("Kafka advertised listeners: {listeners}"))
+       elif 'advertised.listeners' in kafka_server_config:
+         advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+         kafka_server_config['advertised.listeners'] = advertised_listeners
+         Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+    else:
+      kafka_server_config['host.name'] = params.hostname
+
+    if params.has_metric_collector:
+      kafka_server_config['kafka.timeline.metrics.hosts'] = params.ams_collector_hosts
+      kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
+      kafka_server_config['kafka.timeline.metrics.protocol'] = params.metric_collector_protocol
+      kafka_server_config['kafka.timeline.metrics.truststore.path'] = params.metric_truststore_path
+      kafka_server_config['kafka.timeline.metrics.truststore.type'] = params.metric_truststore_type
+      kafka_server_config['kafka.timeline.metrics.truststore.password'] = params.metric_truststore_password
+
+    kafka_data_dir = kafka_server_config['log.dirs']
+    kafka_data_dirs = filter(None, kafka_data_dir.split(","))
+    Directory(kafka_data_dirs,
+              mode=0755,
+              cd_access='a',
+              owner=params.kafka_user,
+              group=params.user_group,
+              create_parents = True,
+              recursive_ownership = True,
+    )
+
+    PropertiesFile("server.properties",
+                      dir=params.conf_dir,
+                      properties=kafka_server_config,
+                      owner=params.kafka_user,
+                      group=params.user_group,
+    )
+
+    File(format("{conf_dir}/kafka-env.sh"),
+          owner=params.kafka_user,
+          content=InlineTemplate(params.kafka_env_sh_template)
+     )
+
+    if (params.log4j_props != None):
+        File(format("{conf_dir}/log4j.properties"),
+             mode=0644,
+             group=params.user_group,
+             owner=params.kafka_user,
+             content=InlineTemplate(params.log4j_props)
+         )
+
+    if params.security_enabled and params.kafka_kerberos_enabled:
+      if params.kafka_jaas_conf_template:
+        File(format("{conf_dir}/kafka_jaas.conf"),
+             owner=params.kafka_user,
+             content=InlineTemplate(params.kafka_jaas_conf_template)
+        )
+      else:
+        TemplateConfig(format("{conf_dir}/kafka_jaas.conf"),
+                         owner=params.kafka_user)
+
+      if params.kafka_client_jaas_conf_template:
+        File(format("{conf_dir}/kafka_client_jaas.conf"),
+             owner=params.kafka_user,
+             content=InlineTemplate(params.kafka_client_jaas_conf_template)
+        )
+      else:
+        TemplateConfig(format("{conf_dir}/kafka_client_jaas.conf"),
+                       owner=params.kafka_user)
+
+    # On some OS this folder could be not exists, so we will create it before pushing there files
+    Directory(params.limits_conf_dir,
+              create_parents = True,
+              owner='root',
+              group='root'
+    )
+
+    File(os.path.join(params.limits_conf_dir, 'kafka.conf'),
+         owner='root',
+         group='root',
+         mode=0644,
+         content=Template("kafka.conf.j2")
+    )
+
+    File(os.path.join(params.conf_dir, 'tools-log4j.properties'),
+         owner='root',
+         group='root',
+         mode=0644,
+         content=Template("tools-log4j.properties.j2")
+         )
+
+    setup_symlink(params.kafka_managed_pid_dir, params.kafka_pid_dir)
+    setup_symlink(params.kafka_managed_log_dir, params.kafka_log_dir)
+
+
+def mutable_config_dict(kafka_broker_config):
+    kafka_server_config = {}
+    for key, value in kafka_broker_config.iteritems():
+        kafka_server_config[key] = value
+    return kafka_server_config
+
+
+# Used to workaround the hardcoded pid/log dir used on the kafka bash process launcher
+def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
+  import params
+  backup_folder_path = None
+  backup_folder_suffix = "_tmp"
+  if kafka_ambari_managed_dir != kafka_managed_dir:
+    if os.path.exists(kafka_managed_dir) and not os.path.islink(kafka_managed_dir):
+
+      # Backup existing data before delete if config is changed repeatedly to/from default location at any point in time time, as there may be relevant contents (historic logs)
+      backup_folder_path = backup_dir_contents(kafka_managed_dir, backup_folder_suffix)
+
+      Directory(kafka_managed_dir,
+                action="delete",
+                create_parents = True)
+
+    elif os.path.islink(kafka_managed_dir) and os.path.realpath(kafka_managed_dir) != kafka_ambari_managed_dir:
+      Link(kafka_managed_dir,
+           action="delete")
+
+    if not os.path.islink(kafka_managed_dir):
+      Link(kafka_managed_dir,
+           to=kafka_ambari_managed_dir)
+
+  elif os.path.islink(kafka_managed_dir): # If config is changed and coincides with the kafka managed dir, remove the symlink and physically create the folder
+    Link(kafka_managed_dir,
+         action="delete")
+
+    Directory(kafka_managed_dir,
+              mode=0755,
+              cd_access='a',
+              owner=params.kafka_user,
+              group=params.user_group,
+              create_parents = True,
+              recursive_ownership = True,
+    )
+
+  if backup_folder_path:
+    # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path;
+    for file in os.listdir(backup_folder_path):
+      if os.path.isdir(os.path.join(backup_folder_path, file)):
+        Execute(('cp', '-r', os.path.join(backup_folder_path, file), kafka_managed_dir),
+                sudo=True)
+        Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(kafka_managed_dir, file)),
+                sudo=True)
+      else:
+        File(os.path.join(kafka_managed_dir,file),
+             owner=params.kafka_user,
+             content = StaticFile(os.path.join(backup_folder_path,file)))
+
+    # Clean up backed up folder
+    Directory(backup_folder_path,
+              action="delete",
+              create_parents = True)
+
+
+# Uses agent temp dir to store backup files
+def backup_dir_contents(dir_path, backup_folder_suffix):
+  import params
+  backup_destination_path = params.tmp_dir + os.path.normpath(dir_path)+backup_folder_suffix
+  Directory(backup_destination_path,
+            mode=0755,
+            cd_access='a',
+            owner=params.kafka_user,
+            group=params.user_group,
+            create_parents = True,
+            recursive_ownership = True,
+  )
+  # Safely copy top-level contents to backup folder
+  for file in os.listdir(dir_path):
+    if os.path.isdir(os.path.join(dir_path, file)):
+      Execute(('cp', '-r', os.path.join(dir_path, file), backup_destination_path),
+              sudo=True)
+      Execute(("chown", "-R", format("{kafka_user}:{user_group}"), os.path.join(backup_destination_path, file)),
+              sudo=True)
+    else:
+      File(os.path.join(backup_destination_path, file),
+         owner=params.kafka_user,
+         content = StaticFile(os.path.join(dir_path,file)))
+
+  return backup_destination_path
+
+def ensure_base_directories():
+  import params
+  Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
+            mode=0755,
+            cd_access='a',
+            owner=params.kafka_user,
+            group=params.user_group,
+            create_parents = True,
+            recursive_ownership = True,
+            )

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
new file mode 100644
index 0000000..81715f9
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/kafka_broker.py
@@ -0,0 +1,151 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management import Script
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute, File, Directory
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import Direction
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.show_logs import show_logs
+from kafka import ensure_base_directories
+
+import upgrade
+from kafka import kafka
+from setup_ranger_kafka import setup_ranger_kafka
+
+class KafkaBroker(Script):
+
+  def get_component_name(self):
+    return "kafka-broker"
+
+  def install(self, env):
+    self.install_packages(env)
+
+  def configure(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    kafka(upgrade_type=upgrade_type)
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
+      stack_select.select("kafka-broker", params.version)
+
+    if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version):
+      conf_select.select(params.stack_name, "kafka", params.version)
+
+    # This is extremely important since it should only be called if crossing the HDP 2.3.4.0 boundary. 
+    if params.current_version and params.version and params.upgrade_direction:
+      src_version = dst_version = None
+      if params.upgrade_direction == Direction.UPGRADE:
+        src_version = format_stack_version(params.current_version)
+        dst_version = format_stack_version(params.version)
+      else:
+        # These represent the original values during the UPGRADE direction
+        src_version = format_stack_version(params.version)
+        dst_version = format_stack_version(params.downgrade_from_version)
+
+      if not check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, src_version) and check_stack_feature(StackFeature.KAFKA_ACL_MIGRATION_SUPPORT, dst_version):
+        # Calling the acl migration script requires the configs to be present.
+        self.configure(env, upgrade_type=upgrade_type)
+        upgrade.run_migration(env, upgrade_type)
+
+  def start(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    self.configure(env, upgrade_type=upgrade_type)
+
+    if params.security_enabled:
+      if params.version and check_stack_feature(StackFeature.KAFKA_KERBEROS, params.version):
+        kafka_kinit_cmd = format("{kinit_path_local} -kt {kafka_keytab_path} {kafka_jaas_principal};")
+        Execute(kafka_kinit_cmd, user=params.kafka_user)
+
+    if params.is_supported_kafka_ranger:
+      setup_ranger_kafka() #Ranger Kafka Plugin related call 
+    daemon_cmd = format('source {params.conf_dir}/kafka-env.sh ; {params.kafka_bin} start')
+    no_op_test = format('ls {params.kafka_pid_file} >/dev/null 2>&1 && ps -p `cat {params.kafka_pid_file}` >/dev/null 2>&1')
+    try:
+      Execute(daemon_cmd,
+              user=params.kafka_user,
+              not_if=no_op_test
+      )
+    except:
+      show_logs(params.kafka_log_dir, params.kafka_user)
+      raise
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    # Kafka package scripts change permissions on folders, so we have to
+    # restore permissions after installing repo version bits
+    # before attempting to stop Kafka Broker
+    ensure_base_directories()
+    daemon_cmd = format('source {params.conf_dir}/kafka-env.sh; {params.kafka_bin} stop')
+    try:
+      Execute(daemon_cmd,
+              user=params.kafka_user,
+      )
+    except:
+      show_logs(params.kafka_log_dir, params.kafka_user)
+      raise
+    File(params.kafka_pid_file,
+          action = "delete"
+    )
+
+  def disable_security(self, env):
+    import params
+    if not params.zookeeper_connect:
+      Logger.info("No zookeeper connection string. Skipping reverting ACL")
+      return
+    if not params.secure_acls:
+      Logger.info("The zookeeper.set.acl is false. Skipping reverting ACL")
+      return
+    Execute(
+      "{0} --zookeeper.connect {1} --zookeeper.acl=unsecure".format(params.kafka_security_migrator, params.zookeeper_connect), \
+      user=params.kafka_user, \
+      environment={ 'JAVA_HOME': params.java64_home }, \
+      logoutput=True, \
+      tries=3)
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_process_status(status_params.kafka_pid_file)
+    
+  def get_log_folder(self):
+    import params
+    return params.kafka_log_dir
+  
+  def get_user(self):
+    import params
+    return params.kafka_user
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.kafka_pid_file]
+
+if __name__ == "__main__":
+  KafkaBroker().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
new file mode 100644
index 0000000..5b0be54
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/params.py
@@ -0,0 +1,341 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions.stack_features import get_stack_feature_version
+from resource_management.libraries.functions.default import default
+from utils import get_bare_principal
+from resource_management.libraries.functions.get_stack_version import get_stack_version
+from resource_management.libraries.functions.is_empty import is_empty
+import status_params
+from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions import stack_select
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import get_kinit_path
+from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
+from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs, generate_ranger_service_config
+
+# server configurations
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+stack_root = Script.get_stack_root()
+stack_name = default("/hostLevelParams/stack_name", None)
+retryAble = default("/commandParams/command_retry_enabled", False)
+
+# Version being upgraded/downgraded to
+version = default("/commandParams/version", None)
+
+# Version that is CURRENT.
+current_version = default("/hostLevelParams/current_version", None)
+
+
+stack_version_unformatted = config['hostLevelParams']['stack_version']
+stack_version_formatted = format_stack_version(stack_version_unformatted)
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+
+# get the correct version to use for checking stack features
+version_for_stack_feature_checks = get_stack_feature_version(config)
+
+stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
+stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
+stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, version_for_stack_feature_checks)
+
+# When downgrading the 'version' and 'current_version' are both pointing to the downgrade-target version
+# downgrade_from_version provides the source-version the downgrade is happening from
+downgrade_from_version = default("/commandParams/downgrade_from_version", None)
+
+hostname = config['hostname']
+
+# default kafka parameters
+kafka_home = '/usr/lib/kafka'
+kafka_bin = kafka_home+'/bin/kafka'
+conf_dir = "/etc/kafka/conf"
+limits_conf_dir = "/etc/security/limits.d"
+
+# Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh
+zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", None)
+
+kafka_user_nofile_limit = default('/configurations/kafka-env/kafka_user_nofile_limit', 128000)
+kafka_user_nproc_limit = default('/configurations/kafka-env/kafka_user_nproc_limit', 65536)
+
+# parameters for 2.2+
+if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+  kafka_home = os.path.join(stack_root,  "current", "kafka-broker")
+  kafka_bin = os.path.join(kafka_home, "bin", "kafka")
+  conf_dir = os.path.join(kafka_home, "config")
+
+kafka_user = config['configurations']['kafka-env']['kafka_user']
+kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
+kafka_pid_dir = status_params.kafka_pid_dir
+kafka_pid_file = kafka_pid_dir+"/kafka.pid"
+# This is hardcoded on the kafka bash process lifecycle on which we have no control over
+kafka_managed_pid_dir = "/var/run/kafka"
+kafka_managed_log_dir = "/var/log/kafka"
+user_group = config['configurations']['cluster-env']['user_group']
+java64_home = config['hostLevelParams']['java_home']
+kafka_env_sh_template = config['configurations']['kafka-env']['content']
+kafka_jaas_conf_template = default("/configurations/kafka_jaas_conf/content", None)
+kafka_client_jaas_conf_template = default("/configurations/kafka_client_jaas_conf/content", None)
+kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
+kafka_hosts.sort()
+
+zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
+zookeeper_hosts.sort()
+secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
+kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh")
+
+#Kafka log4j
+kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
+kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)
+controller_log_maxfilesize = default('/configurations/kafka-log4j/controller_log_maxfilesize',256)
+controller_log_maxbackupindex = default('/configurations/kafka-log4j/controller_log_maxbackupindex',20)
+
+if (('kafka-log4j' in config['configurations']) and ('content' in config['configurations']['kafka-log4j'])):
+    log4j_props = config['configurations']['kafka-log4j']['content']
+else:
+    log4j_props = None
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and \
+    len(config['clusterHostInfo']['ganglia_server_host'])>0:
+  ganglia_installed = True
+  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+  ganglia_report_interval = 60
+else:
+  ganglia_installed = False
+
+metric_collector_port = ""
+metric_collector_protocol = ""
+metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
+metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
+metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+
+set_instanceId = "false"
+cluster_name = config["clusterName"]
+
+if 'cluster-env' in config['configurations'] and \
+        'metrics_collector_external_hosts' in config['configurations']['cluster-env']:
+  ams_collector_hosts = config['configurations']['cluster-env']['metrics_collector_external_hosts']
+  set_instanceId = "true"
+else:
+  ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
+
+has_metric_collector = not len(ams_collector_hosts) == 0
+
+if has_metric_collector:
+  if 'cluster-env' in config['configurations'] and \
+      'metrics_collector_external_port' in config['configurations']['cluster-env']:
+    metric_collector_port = config['configurations']['cluster-env']['metrics_collector_external_port']
+  else:
+    metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
+    if metric_collector_web_address.find(':') != -1:
+      metric_collector_port = metric_collector_web_address.split(':')[1]
+    else:
+      metric_collector_port = '6188'
+  if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY":
+    metric_collector_protocol = 'https'
+  else:
+    metric_collector_protocol = 'http'
+  pass
+
+# Security-related params
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and
+                         ((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or
+                          (config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT")))
+
+
+if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
+  and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
+    _hostname_lowercase = config['hostname'].lower()
+    _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
+    kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
+    kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
+    kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
+    kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf"
+else:
+    kafka_kerberos_params = ''
+    kafka_jaas_principal = None
+    kafka_keytab_path = None
+
+# for curl command in ranger plugin to get db connector
+jdk_location = config['hostLevelParams']['jdk_location']
+
+# ranger kafka plugin section start
+
+# ranger host
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+
+# ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature
+xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks)
+
+# ambari-server hostname
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+ranger_admin_log_dir = default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin")
+
+# ranger kafka plugin enabled property
+enable_ranger_kafka = default("configurations/ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled", "No")
+enable_ranger_kafka = True if enable_ranger_kafka.lower() == 'yes' else False
+
+# ranger kafka-plugin supported flag, instead of dependending on is_supported_kafka_ranger/kafka-env.xml, using stack feature
+is_supported_kafka_ranger = check_stack_feature(StackFeature.KAFKA_RANGER_PLUGIN_SUPPORT, version_for_stack_feature_checks)
+
+# ranger kafka properties
+if enable_ranger_kafka and is_supported_kafka_ranger:
+  # get ranger policy url
+  policymgr_mgr_url = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.policy.rest.url']
+
+  if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
+    policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
+
+  # ranger audit db user
+  xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger')
+
+  xa_audit_db_password = ''
+  if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin:
+    xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password']
+
+  # ranger kafka service/repository name
+  repo_name = str(config['clusterName']) + '_kafka'
+  repo_name_value = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.service.name']
+  if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
+    repo_name = repo_name_value
+
+  ranger_env = config['configurations']['ranger-env']
+
+  # create ranger-env config having external ranger credential properties
+  if not has_ranger_admin and enable_ranger_kafka:
+    external_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_admin_username', 'admin')
+    external_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_admin_password', 'admin')
+    external_ranger_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin')
+    external_ranger_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin')
+    ranger_env = {}
+    ranger_env['admin_username'] = external_admin_username
+    ranger_env['admin_password'] = external_admin_password
+    ranger_env['ranger_admin_username'] = external_ranger_admin_username
+    ranger_env['ranger_admin_password'] = external_ranger_admin_password
+
+  ranger_plugin_properties = config['configurations']['ranger-kafka-plugin-properties']
+  ranger_kafka_audit = config['configurations']['ranger-kafka-audit']
+  ranger_kafka_audit_attrs = config['configuration_attributes']['ranger-kafka-audit']
+  ranger_kafka_security = config['configurations']['ranger-kafka-security']
+  ranger_kafka_security_attrs = config['configuration_attributes']['ranger-kafka-security']
+  ranger_kafka_policymgr_ssl = config['configurations']['ranger-kafka-policymgr-ssl']
+  ranger_kafka_policymgr_ssl_attrs = config['configuration_attributes']['ranger-kafka-policymgr-ssl']
+
+  policy_user = config['configurations']['ranger-kafka-plugin-properties']['policy_user']
+
+  ranger_plugin_config = {
+    'username' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
+    'password' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'],
+    'zookeeper.connect' : config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'],
+    'commonNameForCertificate' : config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate']
+  }
+
+  kafka_ranger_plugin_repo = {
+    'isEnabled': 'true',
+    'configs': ranger_plugin_config,
+    'description': 'kafka repo',
+    'name': repo_name,
+    'repositoryType': 'kafka',
+    'type': 'kafka',
+    'assetType': '1'
+  }
+
+  custom_ranger_service_config = generate_ranger_service_config(ranger_plugin_properties)
+  if len(custom_ranger_service_config) > 0:
+    ranger_plugin_config.update(custom_ranger_service_config)
+
+  if stack_supports_ranger_kerberos and security_enabled:
+    ranger_plugin_config['policy.download.auth.users'] = kafka_user
+    ranger_plugin_config['tag.download.auth.users'] = kafka_user
+    ranger_plugin_config['ambari.service.check.user'] = policy_user
+
+  downloaded_custom_connector = None
+  previous_jdbc_jar_name = None
+  driver_curl_source = None
+  driver_curl_target = None
+  previous_jdbc_jar = None
+
+  if has_ranger_admin and stack_supports_ranger_audit_db:
+    xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR']
+    jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config)
+
+    downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+    previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None
+
+  xa_audit_db_is_enabled = False
+  if xml_configurations_supported and stack_supports_ranger_audit_db:
+    xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db']
+
+  xa_audit_hdfs_is_enabled = default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', False)
+  ssl_keystore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None
+  ssl_truststore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None
+  credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
+
+  stack_version = get_stack_version('kafka-broker')
+  setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
+  setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
+
+  # for SQLA explicitly disable audit to DB for Ranger
+  if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla':
+    xa_audit_db_is_enabled = False
+
+# need this to capture cluster name from where ranger kafka plugin is enabled
+cluster_name = config['clusterName']
+
+# ranger kafka plugin section end
+
+namenode_hosts = default("/clusterHostInfo/namenode_host", [])
+has_namenode = not len(namenode_hosts) == 0
+
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
+hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
+hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
+default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None
+hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
+hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
+kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
+
+import functools
+#create partial functions with common arguments for every HdfsResource call
+#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
+HdfsResource = functools.partial(
+  HdfsResource,
+  user=hdfs_user,
+  hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  hadoop_bin_dir = hadoop_bin_dir,
+  hadoop_conf_dir = hadoop_conf_dir,
+  principal_name = hdfs_principal_name,
+  hdfs_site = hdfs_site,
+  default_fs = default_fs,
+  immutable_paths = get_not_managed_resources()
+)

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
new file mode 100644
index 0000000..0f3a417
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/service_check.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.validate import call_and_match_output
+from resource_management.libraries.functions.format import format
+from resource_management.core.logger import Logger
+from resource_management.core import sudo
+import subprocess
+
+class ServiceCheck(Script):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    # TODO, Kafka Service check should be more robust , It should get all the broker_hosts
+    # Produce some messages and check if consumer reads same no.of messages.
+    
+    kafka_config = self.read_kafka_config()
+    topic = "ambari_kafka_service_check"
+    create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
+    create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
+    source_cmd = format("source {conf_dir}/kafka-env.sh")
+    topic_exists_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --topic {topic} --list")
+    topic_exists_cmd_p = subprocess.Popen(topic_exists_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    topic_exists_cmd_out, topic_exists_cmd_err = topic_exists_cmd_p.communicate()
+    # run create topic command only if the topic doesn't exists
+    if topic not in topic_exists_cmd_out:
+      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic {topic} --partitions 1 --replication-factor 1")
+      command = source_cmd + " ; " + create_topic_cmd
+      Logger.info("Running kafka create topic command: %s" % command)
+      call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists", user=params.kafka_user)
+
+  def read_kafka_config(self):
+    import params
+    
+    kafka_config = {}
+    content = sudo.read_file(params.conf_dir + "/server.properties")
+    for line in content.splitlines():
+      if line.startswith("#") or not line.strip():
+        continue
+
+      key, value = line.split("=")
+      kafka_config[key] = value.replace("\n", "")
+    
+    return kafka_config
+
+if __name__ == "__main__":
+    ServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
new file mode 100644
index 0000000..e9719aa
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/setup_ranger_kafka.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from resource_management.core.logger import Logger
+from resource_management.core.resources import File, Execute
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_core_site_for_required_plugins
+
+def setup_ranger_kafka():
+  import params
+
+  if params.enable_ranger_kafka:
+
+    from resource_management.libraries.functions.setup_ranger_plugin_xml import setup_ranger_plugin
+
+    if params.retryAble:
+      Logger.info("Kafka: Setup ranger: command retry enables thus retrying if ranger admin is down !")
+    else:
+      Logger.info("Kafka: Setup ranger: command retry not enabled thus skipping if ranger admin is down !")
+
+    if params.xml_configurations_supported and params.enable_ranger_kafka and params.xa_audit_hdfs_is_enabled:
+      if params.has_namenode:
+        params.HdfsResource("/ranger/audit",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.hdfs_user,
+                           group=params.hdfs_user,
+                           mode=0755,
+                           recursive_chmod=True
+        )
+        params.HdfsResource("/ranger/audit/kafka",
+                           type="directory",
+                           action="create_on_execute",
+                           owner=params.kafka_user,
+                           group=params.kafka_user,
+                           mode=0700,
+                           recursive_chmod=True
+        )
+        params.HdfsResource(None, action="execute")
+
+    setup_ranger_plugin('kafka-broker', 'kafka', params.previous_jdbc_jar,
+                        params.downloaded_custom_connector, params.driver_curl_source,
+                        params.driver_curl_target, params.java64_home,
+                        params.repo_name, params.kafka_ranger_plugin_repo,
+                        params.ranger_env, params.ranger_plugin_properties,
+                        params.policy_user, params.policymgr_mgr_url,
+                        params.enable_ranger_kafka, conf_dict=params.conf_dir,
+                        component_user=params.kafka_user, component_group=params.user_group, cache_service_list=['kafka'],
+                        plugin_audit_properties=params.ranger_kafka_audit, plugin_audit_attributes=params.ranger_kafka_audit_attrs,
+                        plugin_security_properties=params.ranger_kafka_security, plugin_security_attributes=params.ranger_kafka_security_attrs,
+                        plugin_policymgr_ssl_properties=params.ranger_kafka_policymgr_ssl, plugin_policymgr_ssl_attributes=params.ranger_kafka_policymgr_ssl_attrs,
+                        component_list=['kafka-broker'], audit_db_is_enabled=params.xa_audit_db_is_enabled,
+                        credential_file=params.credential_file, xa_audit_db_password=params.xa_audit_db_password, 
+                        ssl_truststore_password=params.ssl_truststore_password, ssl_keystore_password=params.ssl_keystore_password,
+                        api_version = 'v2', skip_if_rangeradmin_down= not params.retryAble,
+                        is_security_enabled = params.security_enabled,
+                        is_stack_supports_ranger_kerberos = params.stack_supports_ranger_kerberos,
+                        component_user_principal=params.kafka_jaas_principal if params.security_enabled else None,
+                        component_user_keytab=params.kafka_keytab_path if params.security_enabled else None)
+    
+    if params.enable_ranger_kafka: 
+      Execute(('cp', '--remove-destination', params.setup_ranger_env_sh_source, params.setup_ranger_env_sh_target),
+        not_if=format("test -f {setup_ranger_env_sh_target}"),
+        sudo=True
+      )
+      File(params.setup_ranger_env_sh_target,
+        owner = params.kafka_user,
+        group = params.user_group,
+        mode = 0755
+      )
+    if params.stack_supports_core_site_for_ranger_plugin and params.enable_ranger_kafka and params.has_namenode and params.security_enabled:
+      Logger.info("Stack supports core-site.xml creation for Ranger plugin, creating create core-site.xml from namenode configuraitions")
+      setup_core_site_for_required_plugins(component_user=params.kafka_user,component_group=params.user_group,create_core_site_path = params.conf_dir, config = params.config)
+    else:
+      Logger.info("Stack does not support core-site.xml creation for Ranger plugin, skipping core-site.xml configurations")
+  else:
+    Logger.info('Ranger Kafka plugin is not enabled')

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
new file mode 100644
index 0000000..57bdf5e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/status_params.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management.libraries.functions import format
+from resource_management.libraries.script.script import Script
+
+config = Script.get_config()
+
+kafka_pid_dir = config['configurations']['kafka-env']['kafka_pid_dir']
+kafka_pid_file = format("{kafka_pid_dir}/kafka.pid")

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
new file mode 100644
index 0000000..b6e4046
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/upgrade.py
@@ -0,0 +1,78 @@
+
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os
+
+
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions import Direction
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+
+def run_migration(env, upgrade_type):
+  """
+  If the acl migration script is present, then run it for either upgrade or downgrade.
+  That script was introduced in HDP 2.3.4.0 and requires stopping all Kafka brokers first.
+  Requires configs to be present.
+  :param env: Environment.
+  :param upgrade_type: "rolling" or "nonrolling
+  """
+  import params
+
+  if upgrade_type is None:
+    raise Fail('Parameter "upgrade_type" is missing.')
+
+  if params.upgrade_direction is None:
+    raise Fail('Parameter "upgrade_direction" is missing.')
+
+  if params.upgrade_direction == Direction.DOWNGRADE and params.downgrade_from_version is None:
+    raise Fail('Parameter "downgrade_from_version" is missing.')
+
+  if not params.security_enabled:
+    Logger.info("Skip running the Kafka ACL migration script since cluster security is not enabled.")
+    return
+  
+  Logger.info("Upgrade type: {0}, direction: {1}".format(str(upgrade_type), params.upgrade_direction))
+
+  # If the schema upgrade script exists in the version upgrading to, then attempt to upgrade/downgrade it while still using the present bits.
+  kafka_acls_script = None
+  command_suffix = ""
+  if params.upgrade_direction == Direction.UPGRADE:
+    kafka_acls_script = format("{stack_root}/{version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--upgradeAcls"
+  elif params.upgrade_direction == Direction.DOWNGRADE:
+    kafka_acls_script = format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
+    command_suffix = "--downgradeAcls"
+
+  if kafka_acls_script is not None:
+    if os.path.exists(kafka_acls_script):
+      Logger.info("Found Kafka acls script: {0}".format(kafka_acls_script))
+      if params.zookeeper_connect is None:
+        raise Fail("Could not retrieve property kafka-broker/zookeeper.connect")
+
+      acls_command = "{0} --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect={1} {2}".\
+        format(kafka_acls_script, params.zookeeper_connect, command_suffix)
+
+      Execute(acls_command,
+              user=params.kafka_user,
+              logoutput=True)
+    else:
+      Logger.info("Did not find Kafka acls script: {0}".format(kafka_acls_script))

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
new file mode 100644
index 0000000..2f1fa5e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/scripts/utils.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+def get_bare_principal(normalized_principal_name):
+    """
+    Given a normalized principal name (nimbus/c6501.ambari.apache.org@EXAMPLE.COM) returns just the
+    primary component (nimbus)
+    :param normalized_principal_name: a string containing the principal name to process
+    :return: a string containing the primary component value or None if not valid
+    """
+
+    bare_principal = None
+
+    if normalized_principal_name:
+        match = re.match(r"([^/@]+)(?:/[^@])?(?:@.*)?", normalized_principal_name)
+
+    if match:
+        bare_principal = match.group(1)
+
+    return bare_principal

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
new file mode 100644
index 0000000..5b8f896
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/input.config-kafka.json.j2
@@ -0,0 +1,92 @@
+{#
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements.  See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership.  The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License.  You may obtain a copy of the License at
+ #
+ #   http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #}
+{
+  "input":[
+    {
+      "type":"kafka_controller",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/controller.log"
+    },
+    {
+      "type":"kafka_request",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/kafka-request.log"
+    },
+    {
+      "type":"kafka_logcleaner",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/log-cleaner.log"
+    },
+    {
+      "type":"kafka_server",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/server.log"
+    },
+    {
+      "type":"kafka_statechange",
+      "rowtype":"service",
+      "path":"{{default('/configurations/kafka-env/kafka_log_dir', '/var/log/kafka')}}/state-change.log"
+    }
+  ],
+  "filter":[
+    {
+      "filter":"grok",
+      "conditions":{
+        "fields":{
+          "type":[
+            "kafka_controller",
+            "kafka_request",
+            "kafka_logcleaner"
+          ]
+        }
+      },
+      "log4j_format":"[%d] %p %m (%c)%n",
+      "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])",
+      "message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+          }
+        }
+      }
+    },
+    {
+      "filter":"grok",
+      "comment":"Suppose to be same log4j pattern as other kafka processes, but some reason thread is not printed",
+      "conditions":{
+        "fields":{
+          "type":[
+            "kafka_server",
+            "kafka_statechange"
+          ]
+        }
+      },
+      "log4j_format":"[%d] %p %m (%c)%n",
+      "multiline_pattern":"^(\\[%{TIMESTAMP_ISO8601:logtime}\\])",
+      "message_pattern":"(?m)^\\[%{TIMESTAMP_ISO8601:logtime}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS"
+          }
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
new file mode 100644
index 0000000..9e18e1d
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka.conf.j2
@@ -0,0 +1,35 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{{kafka_user}}   - nofile   {{kafka_user_nofile_limit}}
+{{kafka_user}}   - nproc    {{kafka_user_nproc_limit}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
new file mode 100644
index 0000000..7f81d85
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_client_jaas.conf.j2
@@ -0,0 +1,29 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="zookeeper";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
new file mode 100644
index 0000000..56c558d
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/kafka_jaas.conf.j2
@@ -0,0 +1,41 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+KafkaServer {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="{{kafka_bare_jaas_principal}}"
+   principal="{{kafka_jaas_principal}}";
+};
+KafkaClient {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useTicketCache=true
+   renewTicket=true
+   serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+   com.sun.security.auth.module.Krb5LoginModule required
+   useKeyTab=true
+   keyTab="{{kafka_keytab_path}}"
+   storeKey=true
+   useTicketCache=false
+   serviceName="zookeeper"
+   principal="{{kafka_jaas_principal}}";
+};

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2 b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
new file mode 100644
index 0000000..c4ad326
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/package/templates/tools-log4j.properties.j2
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=WARN, stderr
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stderr.Target=System.err
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
new file mode 100644
index 0000000..9a52922
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/role_command_order.json
@@ -0,0 +1,7 @@
+{
+  "general_deps" : {
+    "_comment" : "dependencies for KAFKA",
+    "KAFKA_BROKER-START" : ["ZOOKEEPER_SERVER-START", "RANGER_USERSYNC-START", "NAMENODE-START"],
+    "KAFKA_SERVICE_CHECK-SERVICE_CHECK": ["KAFKA_BROKER-START"]
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
new file mode 100644
index 0000000..d513075
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/widgets.json
@@ -0,0 +1,182 @@
+{
+  "layouts": [
+    {
+      "layout_name": "default_kafka_dashboard",
+      "display_name": "Standard Kafka Dashboard",
+      "section_name": "KAFKA_SUMMARY",
+      "widgetLayoutInfo": [
+        {
+          "widget_name": "Broker Topics",
+          "description": "Broker Topics",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate",
+              "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate",
+              "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate",
+              "metric_path": "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Bytes In",
+              "value": "${kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate}"
+            },
+            {
+              "name": "Bytes Out",
+              "value": "${kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate}"
+            },
+            {
+              "name": "Messages In",
+              "value": "${kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Active Controller Count",
+          "description": "Active Controller Count",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.controller.KafkaController.ActiveControllerCount._sum",
+              "metric_path": "metrics/kafka/controller/KafkaController/ActiveControllerCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Active Controller Count",
+              "value": "${kafka.controller.KafkaController.ActiveControllerCount._sum}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Controller Status",
+          "description": "Controller Status",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate",
+              "metric_path": "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate",
+              "metric_path": "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Leader Election Rate And Time",
+              "value": "${kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate}"
+            },
+            {
+              "name": "Unclean Leader Election",
+              "value": "${kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Replica MaxLag",
+          "description": "Replica MaxLag",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica",
+              "metric_path": "metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Replica MaxLag",
+              "value": "${kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        },
+        {
+          "widget_name": "Replica Manager",
+          "description": "Replica Manager",
+          "widget_type": "GRAPH",
+          "is_visible": true,
+          "metrics": [
+            {
+              "name": "kafka.server.ReplicaManager.PartitionCount._sum",
+              "metric_path": "metrics/kafka/server/ReplicaManager/PartitionCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.ReplicaManager.UnderReplicatedPartitions",
+              "metric_path": "metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            },
+            {
+              "name": "kafka.server.ReplicaManager.LeaderCount._sum",
+              "metric_path": "metrics/kafka/server/ReplicaManager/LeaderCount._sum",
+              "service_name": "KAFKA",
+              "component_name": "KAFKA_BROKER"
+            }
+          ],
+          "values": [
+            {
+              "name": "Partitions count",
+              "value": "${kafka.server.ReplicaManager.PartitionCount._sum}"
+            },
+            {
+              "name": "Under Replicated Partitions",
+              "value": "${kafka.server.ReplicaManager.UnderReplicatedPartitions}"
+            },
+            {
+              "name": "Leader Count",
+              "value": "${kafka.server.ReplicaManager.LeaderCount._sum}"
+            }
+          ],
+          "properties": {
+            "graph_type": "LINE",
+            "time_range": "1"
+          }
+        }
+
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
new file mode 100644
index 0000000..d0326c2
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/services/KAFKA/metainfo.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<metainfo>
+    <schemaVersion>2.0</schemaVersion>
+    <services>
+        <service>
+            <name>KAFKA</name>
+            <version>0.10.0.3.0</version>
+            <extends>common-services/KAFKA/0.10.0.3.0</extends>
+        </service>
+    </services>
+</metainfo>


[2/2] ambari git commit: AMBARI-20939. HDP 3.0 TP - create service definition for Kafka with configs, kerberos, widgets, etc.(vbrodetskyi)

Posted by vb...@apache.org.
AMBARI-20939. HDP 3.0 TP - create service definition for Kafka with configs, kerberos, widgets, etc.(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3817ad5d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3817ad5d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3817ad5d

Branch: refs/heads/trunk
Commit: 3817ad5da2076a274ca9fa4674b6d9d704610cb2
Parents: 1aad067
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Fri May 5 17:09:30 2017 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Fri May 5 17:09:30 2017 +0300

----------------------------------------------------------------------
 .../KAFKA/0.10.0.3.0/alerts.json                |  32 ++
 .../0.10.0.3.0/configuration/kafka-broker.xml   | 569 +++++++++++++++++++
 .../0.10.0.3.0/configuration/kafka-env.xml      | 111 ++++
 .../0.10.0.3.0/configuration/kafka-log4j.xml    | 170 ++++++
 .../configuration/kafka_client_jaas_conf.xml    |  41 ++
 .../configuration/kafka_jaas_conf.xml           |  59 ++
 .../configuration/ranger-kafka-audit.xml        | 130 +++++
 .../ranger-kafka-plugin-properties.xml          | 148 +++++
 .../ranger-kafka-policymgr-ssl.xml              |  66 +++
 .../configuration/ranger-kafka-security.xml     |  64 +++
 .../KAFKA/0.10.0.3.0/kerberos.json              |  76 +++
 .../KAFKA/0.10.0.3.0/metainfo.xml               | 109 ++++
 .../KAFKA/0.10.0.3.0/metrics.json               | 239 ++++++++
 .../KAFKA/0.10.0.3.0/package/scripts/kafka.py   | 276 +++++++++
 .../0.10.0.3.0/package/scripts/kafka_broker.py  | 151 +++++
 .../KAFKA/0.10.0.3.0/package/scripts/params.py  | 341 +++++++++++
 .../0.10.0.3.0/package/scripts/service_check.py |  65 +++
 .../package/scripts/setup_ranger_kafka.py       |  90 +++
 .../0.10.0.3.0/package/scripts/status_params.py |  26 +
 .../KAFKA/0.10.0.3.0/package/scripts/upgrade.py |  78 +++
 .../KAFKA/0.10.0.3.0/package/scripts/utils.py   |  38 ++
 .../templates/input.config-kafka.json.j2        |  92 +++
 .../0.10.0.3.0/package/templates/kafka.conf.j2  |  35 ++
 .../package/templates/kafka_client_jaas.conf.j2 |  29 +
 .../package/templates/kafka_jaas.conf.j2        |  41 ++
 .../package/templates/tools-log4j.properties.j2 |  21 +
 .../KAFKA/0.10.0.3.0/role_command_order.json    |   7 +
 .../KAFKA/0.10.0.3.0/widgets.json               | 182 ++++++
 .../stacks/HDP/3.0/services/KAFKA/metainfo.xml  |  27 +
 29 files changed, 3313 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/alerts.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/alerts.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/alerts.json
new file mode 100644
index 0000000..04fb583
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/alerts.json
@@ -0,0 +1,32 @@
+{
+  "KAFKA": {
+    "service": [],
+    "KAFKA_BROKER": [
+      {
+        "name": "kafka_broker_process",
+        "label": "Kafka Broker Process",
+        "description": "This host-level alert is triggered if the Kafka Broker cannot be determined to be up.",
+        "interval": 1,
+        "scope": "HOST",
+        "source": {
+          "type": "PORT",
+          "uri": "{{kafka-broker/listeners}}",
+          "default_port": 6667,
+          "reporting": {
+            "ok": {
+              "text": "TCP OK - {0:.3f}s response on port {1}"
+            },
+            "warning": {
+              "text": "TCP OK - {0:.3f}s response on port {1}",
+              "value": 1.5
+            },
+            "critical": {
+              "text": "Connection failed: {0} to {1}:{2}",
+              "value": 5.0
+            }
+          }
+        }
+      }
+    ]
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-broker.xml
new file mode 100644
index 0000000..b62b986
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-broker.xml
@@ -0,0 +1,569 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude" supports_final="true">
+  <property>
+    <name>log.dirs</name>
+    <value>/kafka-logs</value>
+    <description>
+      A comma-separated list of one or more directories in which Kafka data is stored.
+      Each new partition that is created will be placed in the directory which currently has the fewest partitions.
+    </description>
+    <value-attributes>
+      <type>directories</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>port</name>
+    <value>6667</value>
+    <description>
+      The port on which the server accepts client connections.
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>zookeeper.connect</name>
+    <value>localhost:2181</value>
+    <description>
+      Zookeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path.
+      This is a way to setup multiple Kafka clusters or other applications on the same zookeeper cluster. To do this give a connection
+     string in the form hostname1:port1,hostname2:port2,hostname3:port3/chroot/path which would put all this cluster's data under the
+      path /chroot/path. Note that consumers must use the same connection string.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>message.max.bytes</name>
+    <value>1000000</value>
+    <description>
+      The maximum size of a message that the server can receive.
+      It is important that this property be in sync with the maximum fetch size your consumers use or
+      else an unruly producer will be able to publish messages too large for consumers to consume.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>num.network.threads</name>
+    <value>3</value>
+    <description>
+      The number of network threads that the server uses for handling network requests.
+      You probably don't need to change this.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>num.io.threads</name>
+    <value>8</value>
+    <description>
+      The number of I/O threads that the server uses for executing requests. You should have at least as many threads as you have disks.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>queued.max.requests</name>
+    <value>500</value>
+    <description>The number of requests that can be queued up for processing by the I/O threads before the network threads stop reading in new requests.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>socket.send.buffer.bytes</name>
+    <value>102400</value>
+    <description>
+      The SO_SNDBUFF buffer the server prefers for socket connections.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>socket.receive.buffer.bytes</name>
+    <value>102400</value>
+    <description>
+      The SO_RCVBUFF buffer the server prefers for socket connections.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>socket.request.max.bytes</name>
+    <value>104857600</value>
+    <description>
+      The maximum request size the server will allow. This prevents the server from running out of memory and should be smaller than the Java heap size.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>num.partitions</name>
+    <value>1</value>
+    <description>
+        The default number of partitions per topic.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.segment.bytes</name>
+    <value>1073741824</value>
+    <description>
+      The maximum request size the server will allow.
+      This prevents the server from running out of memory and should be smaller than the Java heap size.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.roll.hours</name>
+    <value>168</value>
+    <description>
+      This setting will force Kafka to roll a new log segment even if the log.segment.bytes size has not been reached.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.retention.bytes</name>
+    <value>-1</value>
+    <description>
+      The amount of data to retain in the log for each topic-partitions. Note that this is the limit per-partition so multiply by the number of partitions to get the total data retained for the topic. Also note that if both log.retention.hours and log.retention.bytes are both set we delete a segment when either limit is exceeded.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.retention.hours</name>
+    <value>168</value>
+    <description>
+      The number of hours to keep a log segment before it is deleted, i.e. the default data retention window for all topics. Note that if both log.retention.hours and log.retention.bytes are both set we delete a segment when either limit is exceeded.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.cleanup.interval.mins</name>
+    <value>10</value>
+    <description>The frequency in minutes that the log cleaner checks whether any log segment is eligible for deletion to meet the retention policies.
+    </description>
+    <deleted>true</deleted>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.retention.check.interval.ms</name>
+    <value>600000</value>
+    <description>
+      The frequency in milliseconds that the log cleaner checks whether any log segment is eligible for deletion to meet the retention policies.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.index.size.max.bytes</name>
+    <value>10485760</value>
+    <description>
+      The maximum size in bytes we allow for the offset index for each log segment. Note that we will always pre-allocate a
+      sparse file with this much space and shrink it down when the log rolls. If the index fills up we will roll a new log segment
+      even if we haven't reached the log.segment.bytes limit.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>log.index.interval.bytes</name>
+    <value>4096</value>
+    <description>
+      The byte interval at which we add an entry to the offset index. When executing a fetch request the server must do a linear scan for up to this many bytes to find the correct position in the log to begin and end the fetch. So setting this value to be larger will mean larger index files (and a bit more memory usage) but less scanning. However the server will never add more than one index entry per log append (even if more than log.index.interval worth of messages are appended). In general you probably don't need to mess with this value.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>auto.create.topics.enable</name>
+    <value>true</value>
+    <description>
+      Enable auto creation of topic on the server. If this is set to true then attempts to produce, consume, or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controller.socket.timeout.ms</name>
+    <value>30000</value>
+    <description>The socket timeout for commands from the partition management controller to the replicas.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controller.message.queue.size</name>
+    <value>10</value>
+    <description>The buffer size for controller-to-broker-channels</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>default.replication.factor</name>
+    <value>1</value>
+    <description>The default replication factor for automatically created topics.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.lag.time.max.ms</name>
+    <value>10000</value>
+    <description>If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.lag.max.messages</name>
+    <value>4000</value>
+    <description>
+      If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.socket.timeout.ms</name>
+    <value>30000</value>
+    <description>The socket timeout for network requests to the leader for replicating data.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.socket.receive.buffer.bytes</name>
+    <value>65536</value>
+    <description>The socket receive buffer for network requests to the leader for replicating data.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.fetch.max.bytes</name>
+    <value>1048576</value>
+    <description>The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.fetch.wait.max.ms</name>
+    <value>500</value>
+    <description>The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.fetch.min.bytes</name>
+    <value>1</value>
+    <description>Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>num.replica.fetchers</name>
+    <value>1</value>
+    <description>
+      Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>replica.high.watermark.checkpoint.interval.ms</name>
+    <value>5000</value>
+    <description>The frequency with which each replica saves its high watermark to disk to handle recovery.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>fetch.purgatory.purge.interval.requests</name>
+    <value>10000</value>
+    <description>The purge interval (in number of requests) of the fetch request purgatory.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>producer.purgatory.purge.interval.requests</name>
+    <value>10000</value>
+    <description>The purge interval (in number of requests) of the producer request purgatory.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>zookeeper.session.timeout.ms</name>
+    <value>30000</value>
+    <description>Zookeeper session timeout. If the server fails to heartbeat to zookeeper within this period of time it is considered dead. If you set this too low the server may be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>zookeeper.connection.timeout.ms</name>
+    <value>25000</value>
+    <description>The maximum amount of time that the client waits to establish a connection to zookeeper.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>zookeeper.sync.time.ms</name>
+    <value>2000</value>
+    <description>How far a ZK follower can be behind a ZK leader.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controlled.shutdown.max.retries</name>
+    <value>3</value>
+    <description>Number of retries to complete the controlled shutdown successfully before executing an unclean shutdown.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controlled.shutdown.retry.backoff.ms</name>
+    <value>5000</value>
+    <description>
+      Backoff time between shutdown retries.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.metrics.reporters</name>
+    <value/>
+    <description>
+      kafka ganglia metrics reporter and kafka timeline metrics reporter
+    </description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.ganglia.metrics.reporter.enabled</name>
+    <value>true</value>
+    <description>
+      kafka ganglia metrics reporter enable
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.ganglia.metrics.host</name>
+    <value>localhost</value>
+    <description> Ganglia host </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.ganglia.metrics.port</name>
+    <value>8671</value>
+    <description> Ganglia port </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.ganglia.metrics.group</name>
+    <value>kafka</value>
+    <description>Ganglia group name </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.reporter.enabled</name>
+    <value>true</value>
+    <description>Kafka timeline metrics reporter enable</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.hosts</name>
+    <value>{{ams_collector_hosts}}</value>
+    <description>Timeline host</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.port</name>
+    <value>{{metric_collector_port}}</value>
+    <description>Timeline port</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.protocol</name>
+    <value>{{metric_collector_protocol}}</value>
+    <description>Timeline protocol(http or https)</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.truststore.path</name>
+    <value>{{metric_truststore_path}}</value>
+    <description>Location of the trust store file.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.truststore.type</name>
+    <value>{{metric_truststore_type}}</value>
+    <description>Optional. Default value is "jks".</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.truststore.password</name>
+    <value>{{metric_truststore_password}}</value>
+    <description>Password to open the trust store file.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.reporter.sendInterval</name>
+    <value>5900</value>
+    <description>Timeline metrics reporter send interval</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.instanceId</name>
+    <value>{{cluster_name}}</value>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.set.instanceId</name>
+    <value>{{set_instanceId}}</value>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.maxRowCacheSize</name>
+    <value>10000</value>
+    <description>Timeline metrics reporter send interval</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+
+
+
+  <property>
+    <name>listeners</name>
+    <value>PLAINTEXT://localhost:6667</value>
+    <description>host and port where kafka broker will be accepting connections. localhost will be substituted with hostname.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controlled.shutdown.enable</name>
+    <value>true</value>
+    <description>Enable controlled shutdown of the broker. If enabled, the broker will move all leaders on it to some other brokers before shutting itself down. This reduces the unavailability window during shutdown.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>auto.leader.rebalance.enable</name>
+    <value>true</value>
+    <description>Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>num.recovery.threads.per.data.dir</name>
+    <value>1</value>
+    <description>The number of threads per data directory to be used for log recovery at startup and flushing at shutdown</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>min.insync.replicas</name>
+    <value>1</value>
+    <description>define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>leader.imbalance.per.broker.percentage</name>
+    <value>10</value>
+    <description>The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>leader.imbalance.check.interval.seconds</name>
+    <value>300</value>
+    <description>The frequency with which the partition rebalance check is triggered by the controller</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offset.metadata.max.bytes</name>
+    <value>4096</value>
+    <description>The maximum size for a metadata entry associated with an offset commit</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.load.buffer.size</name>
+    <value>5242880</value>
+    <description>Batch size for reading from the offsets segments when loading offsets into the cache.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.topic.replication.factor</name>
+    <value>3</value>
+    <description>The replication factor for the offsets topic (set higher to ensure availability).
+    To ensure that the effective replication factor of the offsets topic is the configured value,
+    the number of alive brokers has to be at least the replication factor at the time of the
+    first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor).</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.topic.num.partitions</name>
+    <value>50</value>
+    <description>The number of partitions for the offset commit topic (should not change after deployment)</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.topic.segment.bytes</name>
+    <value>104857600</value>
+    <description>The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.topic.compression.codec</name>
+    <value>0</value>
+    <description>Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits. Default is NoCompression. For Gzip add value 1 , SnappyCompression add value 2, LZ4CompressionCodec 3.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.retention.minutes</name>
+    <value>86400000</value>
+    <description>Log retention window in minutes for offsets topic</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.retention.check.interval.ms</name>
+    <value>600000</value>
+    <description>Frequency at which to check for stale offsets</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.commit.timeout.ms</name>
+    <value>5000</value>
+    <description>Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>offsets.commit.required.acks</name>
+    <value>-1</value>
+    <description>The required acks before the commit can be accepted. In general, the default (-1) should not be overridden</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>delete.topic.enable</name>
+    <value>false</value>
+    <description>Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>compression.type</name>
+    <description>Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.</description>
+    <value>producer</value>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>external.kafka.metrics.exclude.prefix</name>
+    <value>kafka.network.RequestMetrics,kafka.server.DelayedOperationPurgatory,kafka.server.BrokerTopicMetrics.BytesRejectedPerSec</value>
+    <description>
+      Exclude metrics starting with these prefixes from being collected.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>external.kafka.metrics.include.prefix</name>
+    <value>kafka.network.RequestMetrics.ResponseQueueTimeMs.request.OffsetCommit.98percentile,kafka.network.RequestMetrics.ResponseQueueTimeMs.request.Offsets.95percentile,kafka.network.RequestMetrics.ResponseSendTimeMs.request.Fetch.95percentile,kafka.network.RequestMetrics.RequestsPerSec.request</value>
+    <description>
+      These metrics would be included even if the exclude prefix omits them.
+    </description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>authorizer.class.name</name>
+    <description>
+      Kafka authorizer class
+    </description>
+    <depends-on>
+      <property>
+        <type>ranger-kafka-plugin-properties</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-env.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-env.xml
new file mode 100644
index 0000000..90ba1c8
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-env.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_adding_forbidden="true">
+  <property>
+    <name>kafka_user</name>
+    <display-name>Kafka User</display-name>
+    <value>kafka</value>
+    <property-type>USER</property-type>
+    <description/>
+    <value-attributes>
+      <type>user</type>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_keytab</name>
+    <description>Kafka keytab path</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_principal_name</name>
+    <description>Kafka principal name</description>
+    <property-type>KERBEROS_PRINCIPAL</property-type>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_log_dir</name>
+    <value>/var/log/kafka</value>
+    <description/>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_pid_dir</name>
+    <value>/var/run/kafka</value>
+    <display-name>Kafka PID dir</display-name>
+    <description/>
+    <value-attributes>
+      <type>directory</type>
+      <editable-only-at-install>true</editable-only-at-install>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_user_nofile_limit</name>
+    <value>128000</value>
+    <description>Max open files limit setting for KAFKA user.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_user_nproc_limit</name>
+    <value>65536</value>
+    <description>Max number of processes limit setting for KAFKA user.</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <!-- kafka-env.sh -->
+  <property>
+    <name>content</name>
+    <display-name>kafka-env template</display-name>
+    <description>This is the jinja template for kafka-env.sh file</description>
+    <value>
+#!/bin/bash
+
+# Set KAFKA specific environment variables here.
+
+# The java implementation to use.
+export JAVA_HOME={{java64_home}}
+export PATH=$PATH:$JAVA_HOME/bin
+export PID_DIR={{kafka_pid_dir}}
+export LOG_DIR={{kafka_log_dir}}
+export KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}
+# Add kafka sink to classpath and related depenencies
+if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then
+  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar
+  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*
+fi
+if [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then
+. /etc/kafka/conf/kafka-ranger-env.sh
+fi
+    </value>
+    <value-attributes>
+      <type>content</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>is_supported_kafka_ranger</name>
+    <value>true</value>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-log4j.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-log4j.xml
new file mode 100644
index 0000000..6ae1a6a
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka-log4j.xml
@@ -0,0 +1,170 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="false">
+   <property>
+    <name>kafka_log_maxfilesize</name>
+    <value>256</value>
+    <description>The maximum size of backup file before the log is rotated</description>
+    <display-name>Kafka Log: backup file size</display-name>
+    <value-attributes>
+      <unit>MB</unit>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>kafka_log_maxbackupindex</name>
+    <value>20</value>
+    <description>The number of backup files</description>
+    <display-name>Kafka Log: # of backup files</display-name>
+    <value-attributes>
+      <type>int</type>
+      <minimum>0</minimum>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controller_log_maxfilesize</name>
+    <value>256</value>
+    <description>The maximum size of backup file before the log is rotated</description>
+    <display-name>Kafka Controller Log: backup file size</display-name>
+    <value-attributes>
+      <unit>MB</unit>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>controller_log_maxbackupindex</name>
+    <value>20</value>
+    <description>The number of backup files</description>
+    <display-name>Kafka Controller Log: # of backup files</display-name>
+    <value-attributes>
+      <type>int</type>
+      <minimum>0</minimum>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>content</name>
+    <display-name>kafka-log4j template</display-name>
+    <description>Custom log4j.properties</description>
+    <value>
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+#
+kafka.logs.dir=logs
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.kafkaAppender.MaxFileSize = {{kafka_log_maxfilesize}}MB
+log4j.appender.kafkaAppender.MaxBackupIndex = {{kafka_log_maxbackupindex}}
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.controllerAppender.MaxFileSize = {{controller_log_maxfilesize}}MB
+log4j.appender.controllerAppender.MaxBackupIndex = {{controller_log_maxbackupindex}}
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+   </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-kafka-plugin-properties</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_client_jaas_conf.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_client_jaas_conf.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_client_jaas_conf.xml
new file mode 100644
index 0000000..3f63b03
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_client_jaas_conf.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <display-name>kafka_client_jaas template</display-name>
+    <description>Kafka client jaas config</description>
+    <value>
+KafkaClient {
+com.sun.security.auth.module.Krb5LoginModule required
+useTicketCache=true
+renewTicket=true
+serviceName="{{kafka_bare_jaas_principal}}";
+};
+   </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_jaas_conf.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_jaas_conf.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_jaas_conf.xml
new file mode 100644
index 0000000..a43cf28
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/kafka_jaas_conf.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="false" supports_adding_forbidden="true">
+  <property>
+    <name>content</name>
+    <display-name>kafka_jaas template</display-name>
+    <description>Kafka jaas config</description>
+    <value>
+KafkaServer {
+com.sun.security.auth.module.Krb5LoginModule required
+useKeyTab=true
+keyTab="{{kafka_keytab_path}}"
+storeKey=true
+useTicketCache=false
+serviceName="{{kafka_bare_jaas_principal}}"
+principal="{{kafka_jaas_principal}}";
+};
+KafkaClient {
+com.sun.security.auth.module.Krb5LoginModule required
+useTicketCache=true
+renewTicket=true
+serviceName="{{kafka_bare_jaas_principal}}";
+};
+Client {
+com.sun.security.auth.module.Krb5LoginModule required
+useKeyTab=true
+keyTab="{{kafka_keytab_path}}"
+storeKey=true
+useTicketCache=false
+serviceName="zookeeper"
+principal="{{kafka_jaas_principal}}";
+};
+   </value>
+    <value-attributes>
+      <type>content</type>
+      <show-property-name>false</show-property-name>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-audit.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-audit.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-audit.xml
new file mode 100644
index 0000000..59e7295
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-audit.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>xasecure.audit.is.enabled</name>
+    <value>true</value>
+    <description>Is Audit enabled?</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.hdfs</name>
+    <value>true</value>
+    <display-name>Audit to HDFS</display-name>
+    <description>Is Audit to HDFS enabled?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.hdfs</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.hdfs.dir</name>
+    <value>hdfs://NAMENODE_HOSTNAME:8020/ranger/audit</value>
+    <description>HDFS folder to write audit to, make sure the service user has requried permissions</description>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.hdfs.dir</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.hdfs.batch.filespool.dir</name>
+    <value>/var/log/kafka/audit/hdfs/spool</value>
+    <description>/var/log/kafka/audit/hdfs/spool</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.solr</name>
+    <value>false</value>
+    <display-name>Audit to SOLR</display-name>
+    <description>Is Solr audit enabled?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>xasecure.audit.destination.solr</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.solr.urls</name>
+    <value/>
+    <description>Solr URL</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <depends-on>
+      <property>
+        <type>ranger-admin-site</type>
+        <name>ranger.audit.solr.urls</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.solr.zookeepers</name>
+    <value>NONE</value>
+    <description>Solr Zookeeper string</description>
+    <depends-on>
+      <property>
+        <type>ranger-admin-site</type>
+        <name>ranger.audit.solr.zookeepers</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.destination.solr.batch.filespool.dir</name>
+    <value>/var/log/kafka/audit/solr/spool</value>
+    <description>/var/log/kafka/audit/solr/spool</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.audit.provider.summary.enabled</name>
+    <value>true</value>
+    <display-name>Audit provider summary enabled</display-name>
+    <description>Enable Summary audit?</description>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.ambari.cluster.name</name>
+    <value>{{cluster_name}}</value>
+    <description>Capture cluster name from where Ranger kafka plugin is enabled.</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-plugin-properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-plugin-properties.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-plugin-properties.xml
new file mode 100644
index 0000000..29aa31c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-plugin-properties.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration supports_final="true">
+  <property>
+    <name>policy_user</name>
+    <value>ambari-qa</value>
+    <display-name>Policy user for KAFKA</display-name>
+    <description>This user must be system user and also present at Ranger admin portal</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>hadoop.rpc.protection</name>
+    <value/>
+    <description>Used for repository creation on ranger admin</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>common.name.for.certificate</name>
+    <value/>
+    <description>Common name for certificate, this value should match what is specified in repo within ranger admin</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>zookeeper.connect</name>
+    <value>localhost:2181</value>
+    <description>Used for repository creation on ranger admin</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger-kafka-plugin-enabled</name>
+    <value>No</value>
+    <display-name>Enable Ranger for KAFKA</display-name>
+    <description>Enable ranger kafka plugin</description>
+    <depends-on>
+      <property>
+        <type>ranger-env</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+    </depends-on>
+    <value-attributes>
+      <type>boolean</type>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>REPOSITORY_CONFIG_PASSWORD</name>
+    <value>kafka</value>
+    <property-type>PASSWORD</property-type>
+    <display-name>Ranger repository config password</display-name>
+    <description>Used for repository creation on ranger admin</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+
+
+
+  <property>
+    <name>external_admin_username</name>
+    <value></value>
+    <display-name>External Ranger admin username</display-name>
+    <description>Add ranger default admin username if want to communicate to external ranger</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+
+  <property>
+    <name>external_admin_password</name>
+    <value></value>
+    <display-name>External Ranger admin password</display-name>
+    <property-type>PASSWORD</property-type>
+    <description>Add ranger default admin password if want to communicate to external ranger</description>
+    <value-attributes>
+      <type>password</type>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+
+  <property>
+    <name>external_ranger_admin_username</name>
+    <value></value>
+    <display-name>External Ranger Ambari admin username</display-name>
+    <description>Add ranger default ambari admin username if want to communicate to external ranger</description>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+
+  <property>
+    <name>external_ranger_admin_password</name>
+    <value></value>
+    <display-name>External Ranger Ambari admin password</display-name>
+    <property-type>PASSWORD</property-type>
+    <description>Add ranger default ambari admin password if want to communicate to external ranger</description>
+    <value-attributes>
+      <type>password</type>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>REPOSITORY_CONFIG_USERNAME</name>
+    <value>kafka</value>
+    <display-name>Ranger repository config user</display-name>
+    <description>Used for repository creation on ranger admin</description>
+    <depends-on>
+      <property>
+        <type>ranger-kafka-plugin-properties</type>
+        <name>ranger-kafka-plugin-enabled</name>
+      </property>
+      <property>
+        <type>kafka-env</type>
+        <name>kafka_user</name>
+      </property>
+    </depends-on>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-policymgr-ssl.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-policymgr-ssl.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-policymgr-ssl.xml
new file mode 100644
index 0000000..4a76b60
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-policymgr-ssl.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>xasecure.policymgr.clientssl.keystore</name>
+    <value>/usr/hdp/current/kafka-broker/config/ranger-plugin-keystore.jks</value>
+    <description>Java Keystore files</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.policymgr.clientssl.keystore.password</name>
+    <value>myKeyFilePassword</value>
+    <property-type>PASSWORD</property-type>
+    <description>password for keystore</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore</name>
+    <value>/usr/hdp/current/kafka-broker/config/ranger-plugin-truststore.jks</value>
+    <description>java truststore file</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore.password</name>
+    <value>changeit</value>
+    <property-type>PASSWORD</property-type>
+    <description>java truststore password</description>
+    <value-attributes>
+      <type>password</type>
+    </value-attributes>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.policymgr.clientssl.keystore.credential.file</name>
+    <value>jceks://file/{{credential_file}}</value>
+    <description>java keystore credential file</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>xasecure.policymgr.clientssl.truststore.credential.file</name>
+    <value>jceks://file/{{credential_file}}</value>
+    <description>java truststore credential file</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-security.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-security.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-security.xml
new file mode 100644
index 0000000..47ea2a8
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/configuration/ranger-kafka-security.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>ranger.plugin.kafka.service.name</name>
+    <value>{{repo_name}}</value>
+    <description>Name of the Ranger service containing policies for this Kafka instance</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.policy.source.impl</name>
+    <value>org.apache.ranger.admin.client.RangerAdminRESTClient</value>
+    <description>Class to retrieve policies from the source</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.policy.rest.url</name>
+    <value>{{policymgr_mgr_url}}</value>
+    <description>URL to Ranger Admin</description>
+    <on-ambari-upgrade add="false"/>
+    <depends-on>
+      <property>
+        <type>admin-properties</type>
+        <name>policymgr_external_url</name>
+      </property>
+    </depends-on>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.policy.rest.ssl.config.file</name>
+    <value>/etc/kafka/conf/ranger-policymgr-ssl.xml</value>
+    <description>Path to the file containing SSL details to contact Ranger Admin</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.policy.pollIntervalMs</name>
+    <value>30000</value>
+    <description>How often to poll for changes in policies?</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+  <property>
+    <name>ranger.plugin.kafka.policy.cache.dir</name>
+    <value>/etc/ranger/{{repo_name}}/policycache</value>
+    <description>Directory where Ranger policies are cached after successful retrieval from the source</description>
+    <on-ambari-upgrade add="false"/>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/kerberos.json
new file mode 100644
index 0000000..eb31ad6
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/kerberos.json
@@ -0,0 +1,76 @@
+{
+  "services": [
+    {
+      "name": "KAFKA",
+      "identities": [
+        {
+          "name": "/smokeuser"
+        }
+      ],
+      "configurations": [
+        {
+          "kafka-broker": {
+              "authorizer.class.name": "kafka.security.auth.SimpleAclAuthorizer",
+              "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
+              "super.users": "user:${kafka-env/kafka_user}",
+              "security.inter.broker.protocol": "PLAINTEXTSASL",
+              "zookeeper.set.acl": "true",
+              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
+          }
+        },
+        {
+          "ranger-kafka-audit": {
+            "xasecure.audit.jaas.Client.loginModuleName": "com.sun.security.auth.module.Krb5LoginModule",
+            "xasecure.audit.jaas.Client.loginModuleControlFlag": "required",
+            "xasecure.audit.jaas.Client.option.useKeyTab": "true",
+            "xasecure.audit.jaas.Client.option.storeKey": "false",
+            "xasecure.audit.jaas.Client.option.serviceName": "solr",
+            "xasecure.audit.destination.solr.force.use.inmemory.jaas.config": "true"
+          }
+        }
+      ],
+      "components": [
+        {
+          "name": "KAFKA_BROKER",
+          "identities": [
+            {
+              "name": "kafka_broker",
+              "principal": {
+                "value": "${kafka-env/kafka_user}/_HOST@${realm}",
+                "type": "service",
+                "configuration": "kafka-env/kafka_principal_name"
+              },
+              "keytab": {
+                "file": "${keytab_dir}/kafka.service.keytab",
+                "owner": {
+                  "name": "${kafka-env/kafka_user}",
+                  "access": "r"
+                },
+                "group": {
+                  "name": "${cluster-env/user_group}",
+                  "access": ""
+                },
+                "configuration": "kafka-env/kafka_keytab"
+              }
+            },
+            {
+              "name": "/KAFKA/KAFKA_BROKER/kafka_broker",
+              "principal": {
+                "configuration": "ranger-kafka-audit/xasecure.audit.jaas.Client.option.principal"
+              },
+              "keytab": {
+                "configuration": "ranger-kafka-audit/xasecure.audit.jaas.Client.option.keyTab"
+              }
+            },
+            {
+              "name": "/HDFS/NAMENODE/hdfs",
+              "when" : {
+                "contains" : ["services", "HDFS"]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metainfo.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metainfo.xml
new file mode 100644
index 0000000..a19850e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metainfo.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<metainfo>
+    <schemaVersion>2.0</schemaVersion>
+    <services>
+        <service>
+            <name>KAFKA</name>
+            <displayName>Kafka</displayName>
+            <comment>A high-throughput distributed messaging system</comment>
+            <version>0.10.0.3.0</version>
+            <components>
+                <component>
+                    <name>KAFKA_BROKER</name>
+                    <displayName>Kafka Broker</displayName>
+                    <category>MASTER</category>
+                    <cardinality>1+</cardinality>
+                    <versionAdvertised>true</versionAdvertised>
+                    <dependencies>
+                        <dependency>
+                            <name>ZOOKEEPER/ZOOKEEPER_SERVER</name>
+                            <scope>cluster</scope>
+                            <auto-deploy>
+                                <enabled>true</enabled>
+                            </auto-deploy>
+                        </dependency>
+                    </dependencies>
+                    <commandScript>
+                        <script>scripts/kafka_broker.py</script>
+                        <scriptType>PYTHON</scriptType>
+                        <timeout>1200</timeout>
+                    </commandScript>
+                    <logs>
+                        <log>
+                            <logId>kafka_server</logId>
+                            <primary>true</primary>
+                        </log>
+                        <log>
+                            <logId>kafka_controller</logId>
+                        </log>
+                        <log>
+                            <logId>kafka_request</logId>
+                        </log>
+                        <log>
+                            <logId>kafka_logcleaner</logId>
+                        </log>
+                        <log>
+                            <logId>kafka_statechange</logId>
+                        </log>
+                    </logs>
+                </component>
+            </components>
+            <commandScript>
+                <script>scripts/service_check.py</script>
+                <scriptType>PYTHON</scriptType>
+                <timeout>300</timeout>
+            </commandScript>
+            <requiredServices>
+                <service>ZOOKEEPER</service>
+            </requiredServices>
+            <configuration-dependencies>
+                <config-type>kafka-broker</config-type>
+                <config-type>kafka-env</config-type>
+                <config-type>kafka-log4j</config-type>
+                <config-type>ranger-kafka-plugin-properties</config-type>
+                <config-type>ranger-kafka-audit</config-type>
+                <config-type>ranger-kafka-policymgr-ssl</config-type>
+                <config-type>ranger-kafka-security</config-type>
+                <config-type>zookeeper-env</config-type>
+                <config-type>zoo.cfg</config-type>
+                <config-type>kafka_jaas_conf</config-type>
+                <config-type>kafka_client_jaas_conf</config-type>
+            </configuration-dependencies>
+            <osSpecifics>
+                <osSpecific>
+                    <osFamily>redhat7,amazon2015,redhat6,suse11,suse12</osFamily>
+                    <packages>
+                        <package>
+                            <name>kafka_${stack_version}</name>
+                        </package>
+                    </packages>
+                </osSpecific>
+                <osSpecific>
+                    <osFamily>debian7,ubuntu12,ubuntu14,ubuntu16</osFamily>
+                    <packages>
+                        <package>
+                            <name>kafka-${stack_version}</name>
+                        </package>
+                    </packages>
+                </osSpecific>
+            </osSpecifics>
+            <restartRequiredAfterChange>true</restartRequiredAfterChange>
+        </service>
+    </services>
+</metainfo>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3817ad5d/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metrics.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metrics.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metrics.json
new file mode 100644
index 0000000..e99f4eb
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0.3.0/metrics.json
@@ -0,0 +1,239 @@
+{
+  "KAFKA_BROKER": {
+    "Component": [
+      {
+        "type": "ganglia",
+        "metrics": {
+          "default": {
+            "metrics/jvm/uptime": {
+              "metric": "jvm.uptime",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/heap_usage": {
+              "metric": "jvm.heap_usage",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/non_heap_usage": {
+              "metric": "jvm.non_heap_usage",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/runnable": {
+              "metric": "jvm.thread-states.runnable",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/blocked": {
+              "metric": "jvm.thread-states.blocked",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/timed_waiting": {
+              "metric": "jvm.thread-states.timed_waiting",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread-states/terminated": {
+              "metric": "jvm.thread-states.terminated",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/thread_count": {
+              "metric": "jvm.thread_count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/jvm/daemon_thread_count": {
+              "metric": "jvm.daemon_thread_count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/5MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.5MinuteRate",
+              "pointInTime": false,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/15MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.15MinuteRate",
+              "pointInTime": false,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/meanRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/count": {
+              "metric": "kafka.server.BrokerTopicMetrics.MessagesInPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/5MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/15MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/meanRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/count": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesInPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/5MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/15MinuteRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/meanRate": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/count": {
+              "metric": "kafka.server.BrokerTopicMetrics.BytesOutPerSec.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/KafkaController/ActiveControllerCount": {
+              "metric": "kafka.controller.KafkaController.ActiveControllerCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/meanRate": {
+              "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.meanRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/5MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/15MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/count": {
+              "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/5MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.5MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/15MinuteRate": {
+              "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.15MinuteRate",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/controller/ControllerStats/OfflinePartitionsCount": {
+              "metric": "kafka.controller.ControllerStats.OfflinePartitionsCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/PartitionCount": {
+              "metric": "kafka.server.ReplicaManager.PartitionCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/LeaderCount": {
+              "metric": "kafka.server.ReplicaManager.LeaderCount",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions": {
+              "metric": "kafka.server.ReplicaManager.UnderReplicatedPartitions",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/ISRShrinksPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRShrinksPerSec",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ReplicaManager/ISRExpandsPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRExpandsPerSec",
+              "pointInTime": true,
+              "temporal": true
+            },
+
+            "metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag": {
+              "metric": "kafka.server.ReplicaFetcherManager.MaxLag.clientId.Replica",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/ProducerRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.ProducerRequestPurgatory.PurgatorySize",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/server/FetchRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.FetchRequestPurgatory.PurgatorySize",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/cluster/Partition/$1-UnderReplicated": {
+              "metric": "kafka.cluster.Partition.(\\w+)-UnderReplicated",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/consumer/ConsumerFetcherManager/$1-MaxLag": {
+              "metric": "kafka.consumer.ConsumerFetcherManager.(\\w+)-MaxLag",
+              "pointInTime": true,
+              "temporal": true
+            },
+            "metrics/kafka/consumer/ConsumerFetcherManager/$1-MinFetch": {
+              "metric": "kafka.consumer.ConsumerFetcherManager.(\\w+)-MinFetch",
+              "pointInTime": true,
+              "temporal": true
+            }
+          }
+        }
+      }
+    ]
+  }
+}