You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2016/03/25 00:10:44 UTC
ambari git commit: AMBARI-15526: Stack Featurize Kafka service
(jluniya)
Repository: ambari
Updated Branches:
refs/heads/trunk 4a64727fb -> 7cbf3f4b4
AMBARI-15526: Stack Featurize Kafka service (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7cbf3f4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7cbf3f4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7cbf3f4b
Branch: refs/heads/trunk
Commit: 7cbf3f4b44beaca76d95035567ca38b23a92a741
Parents: 4a64727
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Thu Mar 24 16:10:36 2016 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Thu Mar 24 16:10:36 2016 -0700
----------------------------------------------------------------------
.../libraries/functions/constants.py | 3 +++
.../libraries/functions/stack_features.py | 18 ++++++++++++++-
.../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 10 +++++---
.../0.8.1.2.2/package/scripts/kafka_broker.py | 10 +++++---
.../KAFKA/0.8.1.2.2/package/scripts/params.py | 24 ++++++++++++--------
.../KAFKA/0.8.1.2.2/package/scripts/upgrade.py | 4 ++--
.../HDP/2.0.6/properties/stack_features.json | 18 ++++++++++++++-
7 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
index 9ecb55b..f766a82 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py
@@ -50,3 +50,6 @@ class StackFeature:
SPARK_THRIFTSERVER = "spark_thriftserver"
STORM_KERBEROS = "storm_kerberos"
STORM_AMS = "storm_ams"
+ CREATE_KAFKA_BROKER_ID = "create_kafka_broker_id"
+ KAFKA_LISTENERS = "kafka_listeners"
+ KAFKA_KERBEROS = "kafka_kerberos"
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
index 31f9d25..2f0e6bf 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py
@@ -82,6 +82,22 @@ _DEFAULT_STACK_FEATURES = {
"name": "storm_ams",
"description": "Storm AMS integration (AMBARI-10710)",
"min_version": "2.2.0.0"
+ },
+ {
+ "name": "create_kafka_broker_id",
+ "description": "Ambari should create Kafka Broker Id (AMBARI-12678)",
+ "min_version": "2.2.0.0",
+ "max_version": "2.3.0.0"
+ },
+ {
+ "name": "kafka_listeners",
+ "description": "Kafka listeners (AMBARI-10984)",
+ "min_version": "2.3.0.0"
+ },
+ {
+ "name": "kafka_kerberos",
+ "description": "Kafka Kerberos support (AMBARI-10984)",
+ "min_version": "2.3.0.0"
}
]
}
@@ -111,4 +127,4 @@ def check_stack_feature(stack_feature, stack_version):
return False
return True
- return False
\ No newline at end of file
+ return False
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
index 43b318c..33275f9 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
@@ -20,12 +20,14 @@ limitations under the License.
import collections
import os
-from resource_management.libraries.functions.version import format_stack_version, compare_versions
+from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.resources.properties_file import PropertiesFile
from resource_management.libraries.resources.template_config import TemplateConfig
from resource_management.core.resources.system import Directory, Execute, File, Link
from resource_management.core.source import StaticFile, Template, InlineTemplate
from resource_management.libraries.functions import format
+from resource_management.libraries.functions.stack_features import check_stack_feature
+from resource_management.libraries.functions import StackFeature
from resource_management.core.logger import Logger
@@ -42,14 +44,16 @@ def kafka(upgrade_type=None):
effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version)
Logger.info(format("Effective stack version: {effective_version}"))
- if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.2.0.0') >= 0 and compare_versions(effective_version, '2.3.0.0') < 0:
+ if effective_version is not None and effective_version != "" and \
+ check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version):
if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
kafka_server_config['broker.id'] = brokerid
Logger.info(format("Calculating broker.id as {brokerid}"))
# listeners and advertised.listeners are only added in 2.3.0.0 onwards.
- if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.3.0.0') >= 0:
+ if effective_version is not None and effective_version != "" and \
+ check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
Logger.info(format("Kafka listeners: {listeners}"))
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
index 314d702..2043cfa 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py
@@ -25,6 +25,8 @@ from resource_management.libraries.functions import Direction
from resource_management.libraries.functions.version import compare_versions, format_stack_version
from resource_management.libraries.functions.format import format
from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
from kafka import ensure_base_directories
import upgrade
@@ -34,7 +36,8 @@ from setup_ranger_kafka import setup_ranger_kafka
class KafkaBroker(Script):
def get_stack_to_component(self):
- return {"HDP": "kafka-broker"}
+ import params
+ return {params.stack_name : "kafka-broker"}
def install(self, env):
self.install_packages(env)
@@ -48,10 +51,10 @@ class KafkaBroker(Script):
import params
env.set_params(params)
- if params.version and compare_versions(format_stack_version(params.version), '2.2.0.0') >= 0:
+ if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
stack_select.select("kafka-broker", params.version)
- if params.version and compare_versions(format_stack_version(params.version), '2.3.0.0') >= 0:
+ if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version):
conf_select.select(params.stack_name, "kafka", params.version)
# This is extremely important since it should only be called if crossing the HDP 2.3.4.0 boundary.
@@ -65,6 +68,7 @@ class KafkaBroker(Script):
src_version = format_stack_version(params.version)
dst_version = format_stack_version(params.downgrade_from_version)
+ # TODO: How to handle the case of crossing stack version boundary in a stack agnostic way?
if compare_versions(src_version, '2.3.4.0') < 0 and compare_versions(dst_version, '2.3.4.0') >= 0:
# Calling the acl migration script requires the configs to be present.
self.configure(env, upgrade_type=upgrade_type)
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 4ac9401..270a5ac 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -17,9 +17,12 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
from resource_management.libraries.functions import format
from resource_management.libraries.script.script import Script
-from resource_management.libraries.functions.version import format_stack_version, compare_versions
+from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions import StackFeature
+from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.functions.default import default
from utils import get_bare_principal
from resource_management.libraries.functions.get_stack_version import get_stack_version
@@ -35,6 +38,7 @@ from resource_management.libraries.functions.get_not_managed_resources import ge
# server configurations
config = Script.get_config()
tmp_dir = Script.get_tmp_dir()
+stack_root = Script.get_stack_root()
stack_name = default("/hostLevelParams/stack_name", None)
retryAble = default("/commandParams/command_retry_enabled", False)
@@ -57,7 +61,7 @@ downgrade_from_version = default("/commandParams/downgrade_from_version", None)
hostname = config['hostname']
# default kafka parameters
-kafka_home = '/usr/lib/kafka/'
+kafka_home = '/usr/lib/kafka'
kafka_bin = kafka_home+'/bin/kafka'
conf_dir = "/etc/kafka/conf"
limits_conf_dir = "/etc/security/limits.d"
@@ -69,11 +73,10 @@ kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofi
kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit']
# parameters for 2.2+
-if Script.is_stack_greater_or_equal("2.2"):
- kafka_home = '/usr/hdp/current/kafka-broker/'
- kafka_bin = kafka_home+'bin/kafka'
- conf_dir = "/usr/hdp/current/kafka-broker/config"
-
+if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
+ kafka_home = os.path.join(stack_root, "current", "kafka-broker")
+ kafka_bin = os.path.join(kafka_home, "bin", "kafka")
+ conf_dir = os.path.join(kafka_home, "config")
kafka_user = config['configurations']['kafka-env']['kafka_user']
kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
@@ -139,7 +142,8 @@ security_enabled = config['configurations']['cluster-env']['security_enabled']
kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker'] and
config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL")
-if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] and compare_versions(stack_version_formatted, '2.3') >= 0:
+if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
+ and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
_hostname_lowercase = config['hostname'].lower()
_kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
@@ -239,7 +243,7 @@ if has_ranger_admin and is_supported_kafka_ranger:
downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}")
driver_curl_source = format("{jdk_location}/{jdbc_symlink_name}")
- driver_curl_target = format("{kafka_home}libs/{jdbc_jar_name}")
+ driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}")
ranger_audit_solr_urls = config['configurations']['ranger-admin-site']['ranger.audit.solr.urls']
xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db'] if xml_configurations_supported else None
@@ -249,7 +253,7 @@ if has_ranger_admin and is_supported_kafka_ranger:
credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if xml_configurations_supported else None
stack_version = get_stack_version('kafka-broker')
- setup_ranger_env_sh_source = format('/usr/hdp/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
+ setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
#For SQLA explicitly disable audit to DB for Ranger
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
index 457a10f..b6e4046 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py
@@ -56,10 +56,10 @@ def run_migration(env, upgrade_type):
kafka_acls_script = None
command_suffix = ""
if params.upgrade_direction == Direction.UPGRADE:
- kafka_acls_script = format("/usr/hdp/{version}/kafka/bin/kafka-acls.sh")
+ kafka_acls_script = format("{stack_root}/{version}/kafka/bin/kafka-acls.sh")
command_suffix = "--upgradeAcls"
elif params.upgrade_direction == Direction.DOWNGRADE:
- kafka_acls_script = format("/usr/hdp/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
+ kafka_acls_script = format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh")
command_suffix = "--downgradeAcls"
if kafka_acls_script is not None:
http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
index 5e5a23c..97bd19c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json
@@ -56,6 +56,22 @@
"name": "storm_ams",
"description": "Storm AMS integration (AMBARI-10710)",
"min_version": "2.2.0.0"
+ },
+ {
+ "name": "create_kafka_broker_id",
+ "description": "Ambari should create Kafka Broker Id (AMBARI-12678)",
+ "min_version": "2.2.0.0",
+ "max_version": "2.3.0.0"
+ },
+ {
+ "name": "kafka_listeners",
+ "description": "Kafka listeners (AMBARI-10984)",
+ "min_version": "2.3.0.0"
+ },
+ {
+ "name": "kafka_kerberos",
+ "description": "Kafka Kerberos support (AMBARI-10984)",
+ "min_version": "2.3.0.0"
}
]
-}
\ No newline at end of file
+}