You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/03/25 00:10:44 UTC

ambari git commit: AMBARI-15526: Stack Featurize Kafka service (jluniya)

Repository: ambari
Updated Branches:
  refs/heads/trunk 4a64727fb -> 7cbf3f4b4


AMBARI-15526: Stack Featurize Kafka service (jluniya)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7cbf3f4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7cbf3f4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7cbf3f4b

Branch: refs/heads/trunk
Commit: 7cbf3f4b44beaca76d95035567ca38b23a92a741
Parents: 4a64727
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Thu Mar 24 16:10:36 2016 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Thu Mar 24 16:10:36 2016 -0700

----------------------------------------------------------------------
 .../libraries/functions/constants.py            |  3 +++
 .../libraries/functions/stack_features.py       | 18 ++++++++++++++-
 .../KAFKA/0.8.1.2.2/package/scripts/kafka.py    | 10 +++++---
 .../0.8.1.2.2/package/scripts/kafka_broker.py   | 10 +++++---
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   | 24 ++++++++++++--------
 .../KAFKA/0.8.1.2.2/package/scripts/upgrade.py  |  4 ++--
 .../HDP/2.0.6/properties/stack_features.json    | 18 ++++++++++++++-
 7 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
index 9ecb55b..f766a82 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
@@ -50,3 +50,6 @@ class StackFeature:
   SPARK_THRIFTSERVER = "spark_thriftserver"
   STORM_KERBEROS = "storm_kerberos"
   STORM_AMS = "storm_ams"
+  CREATE_KAFKA_BROKER_ID = "create_kafka_broker_id"
+  KAFKA_LISTENERS = "kafka_listeners"
+  KAFKA_KERBEROS = "kafka_kerberos"

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
index 31f9d25..2f0e6bf 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
@@ -82,6 +82,22 @@ _DEFAULT_STACK_FEATURES = {
       "name": "storm_ams",
       "description": "Storm AMS integration (AMBARI-10710)",
       "min_version": "2.2.0.0"
+    },
+    {
+      "name": "create_kafka_broker_id",
+      "description": "Ambari should create Kafka Broker Id (AMBARI-12678)",
+      "min_version": "2.2.0.0",
+      "max_version": "2.3.0.0"
+    },
+    {
+      "name": "kafka_listeners",
+      "description": "Kafka listeners (AMBARI-10984)",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "kafka_kerberos",
+      "description": "Kafka Kerberos support (AMBARI-10984)",
+      "min_version": "2.3.0.0"
     }
   ]
 }
@@ -111,4 +127,4 @@ def check_stack_feature(stack_feature, stack_version):
           return False
       return True
         
-  return False    
\ No newline at end of file
+  return False

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 43b318c..33275f9 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -20,12 +20,14 @@ limitations under the License.
 import collections
 import os
 
-from resource_management.libraries.functions.version import format_stack_version, compare_versions
+from resource_management.libraries.functions.version import format_stack_version
 from resource_management.libraries.resources.properties_file import PropertiesFile
 from resource_management.libraries.resources.template_config import TemplateConfig
 from resource_management.core.resources.system import Directory, Execute, File, Link
 from resource_management.core.source import StaticFile, Template, InlineTemplate
 from resource_management.libraries.functions import format
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
 
 
 from resource_management.core.logger import Logger
@@ -42,14 +44,16 @@ def kafka(upgrade_type=None):
     effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version)
     Logger.info(format("Effective stack version: {effective_version}"))
 
-    if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.2.0.0') >= 0 and compare_versions(effective_version, '2.3.0.0') < 0:
+    if effective_version is not None and effective_version != "" and \
+      check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version):
       if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
         brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
         kafka_server_config['broker.id'] = brokerid
         Logger.info(format("Calculating broker.id as {brokerid}"))
 
     # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
-    if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.3.0.0') >= 0:
+    if effective_version is not None and effective_version != "" and \
+        check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
       listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
       Logger.info(format("Kafka listeners: {listeners}"))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 314d702..2043cfa 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -25,6 +25,8 @@ from resource_management.libraries.functions import Direction
 from resource_management.libraries.functions.version import compare_versions, format_stack_version
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
 from kafka import ensure_base_directories
 
 import upgrade
@@ -34,7 +36,8 @@ from setup_ranger_kafka import setup_ranger_kafka
 class KafkaBroker(Script):
 
   def get_stack_to_component(self):
-    return {"HDP": "kafka-broker"}
+    import params
+    return {params.stack_name : "kafka-broker"}
 
   def install(self, env):
     self.install_packages(env)
@@ -48,10 +51,10 @@ class KafkaBroker(Script):
     import params
     env.set_params(params)
 
-    if params.version and compare_versions(format_stack_version(params.version), '2.2.0.0') >= 0:
+    if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
       stack_select.select("kafka-broker", params.version)
 
-    if params.version and compare_versions(format_stack_version(params.version), '2.3.0.0') >= 0:
+    if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version):
       conf_select.select(params.stack_name, "kafka", params.version)
 
     # This is extremely important since it should only be called if crossing the HDP 2.3.4.0 boundary. 
@@ -65,6 +68,7 @@ class KafkaBroker(Script):
         src_version = format_stack_version(params.version)
         dst_version = format_stack_version(params.downgrade_from_version)
 
+      # TODO: How to handle the case of crossing stack version boundary in a stack agnostic way?
       if compare_versions(src_version, '2.3.4.0') < 0 and compare_versions(dst_version, '2.3.4.0') >= 0:
         # Calling the acl migration script requires the configs to be present.
         self.configure(env, upgrade_type=upgrade_type)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 4ac9401..270a5ac 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -17,9 +17,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+import os
 from resource_management.libraries.functions import format
 from resource_management.libraries.script.script import Script
-from resource_management.libraries.functions.version import format_stack_version, compare_versions
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
 from resource_management.libraries.functions.default import default
 from utils import get_bare_principal
 from resource_management.libraries.functions.get_stack_version import get_stack_version
@@ -35,6 +38,7 @@ from resource_management.libraries.functions.get_not_managed_resources import ge
 # server configurations
 config = Script.get_config()
 tmp_dir = Script.get_tmp_dir()
+stack_root = Script.get_stack_root()
 stack_name = default("/hostLevelParams/stack_name", None)
 retryAble = default("/commandParams/command_retry_enabled", False)
 
@@ -57,7 +61,7 @@ downgrade_from_version = default("/commandParams/downgrade_from_version", None)
 hostname = config['hostname']
 
 # default kafka parameters
-kafka_home = '/usr/lib/kafka/'
+kafka_home = '/usr/lib/kafka'
 kafka_bin = kafka_home+'/bin/kafka'
 conf_dir = "/etc/kafka/conf"
 limits_conf_dir = "/etc/security/limits.d"
@@ -69,11 +73,10 @@ kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofi
 kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit']
 
 # parameters for 2.2+
-if Script.is_stack_greater_or_equal("2.2"):
-  kafka_home = '/usr/hdp/current/kafka-broker/'
-  kafka_bin = kafka_home+'bin/kafka'
-  conf_dir = "/usr/hdp/current/kafka-broker/config"
-
+if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+  kafka_home = os.path.join(stack_root,  "current", "kafka-broker")
+  kafka_bin = os.path.join(kafka_home, "bin", "kafka")
+  conf_dir = os.path.join(kafka_home, "config")
 
 kafka_user = config['configurations']['kafka-env']['kafka_user']
 kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
@@ -139,7 +142,8 @@ security_enabled = config['configurations']['cluster-env']['security_enabled']
 kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker'] and
                           config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL")
 
-if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] and compare_versions(stack_version_formatted, '2.3') >= 0:
+if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
+  and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
     _hostname_lowercase = config['hostname'].lower()
     _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
     kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
@@ -239,7 +243,7 @@ if has_ranger_admin and is_supported_kafka_ranger:
   downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}")
 
   driver_curl_source = format("{jdk_location}/{jdbc_symlink_name}")
-  driver_curl_target = format("{kafka_home}libs/{jdbc_jar_name}")
+  driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}")
 
   ranger_audit_solr_urls = config['configurations']['ranger-admin-site']['ranger.audit.solr.urls']
   xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db'] if xml_configurations_supported else None
@@ -249,7 +253,7 @@ if has_ranger_admin and is_supported_kafka_ranger:
   credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if xml_configurations_supported else None
 
   stack_version = get_stack_version('kafka-broker')
-  setup_ranger_env_sh_source = format('/usr/hdp/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
+  setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
   setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
 
   #For SQLA explicitly disable audit to DB for Ranger

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
index 457a10f..b6e4046 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
@@ -56,10 +56,10 @@ def run_migration(env, upgrade_type):
   kafka_acls_script = None
   command_suffix = ""
   if params.upgrade_direction == Direction.UPGRADE:
-    kafka_acls_script = format("/usr/hdp/{version}/kafka/bin/kafka-acls.sh")
+    kafka_acls_script = format("{stack_root}/{version}/kafka/bin/kafka-acls.sh")
     command_suffix = "--upgradeAcls"
   elif params.upgrade_direction == Direction.DOWNGRADE:
-    kafka_acls_script = format("/usr/hdp/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
+    kafka_acls_script = format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
     command_suffix = "--downgradeAcls"
 
   if kafka_acls_script is not None:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
index 5e5a23c..97bd19c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
@@ -56,6 +56,22 @@
       "name": "storm_ams",
       "description": "Storm AMS integration (AMBARI-10710)",
       "min_version": "2.2.0.0"
+    },
+    {
+      "name": "create_kafka_broker_id",
+      "description": "Ambari should create Kafka Broker Id (AMBARI-12678)",
+      "min_version": "2.2.0.0",
+      "max_version": "2.3.0.0"
+    },
+    {
+      "name": "kafka_listeners",
+      "description": "Kafka listeners (AMBARI-10984)",
+      "min_version": "2.3.0.0"
+    },
+    {
+      "name": "kafka_kerberos",
+      "description": "Kafka Kerberos support (AMBARI-10984)",
+      "min_version": "2.3.0.0"
     }
   ]
-}
\ No newline at end of file
+}