You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/01/17 01:48:23 UTC
[1/2] ambari git commit: AMBARI-19097. HDP 3.0 TP - create Service
Advisor for HDFS (alejandro)
Repository: ambari
Updated Branches:
refs/heads/trunk 5bdd6cf7e -> 326cc1b2a
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
index 4e4ef51..c7d9327 100644
--- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
@@ -37,6 +37,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -79,6 +80,32 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
"""
self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS'])
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes = {
+ 'NAMENODE': {"else": 0},
+ 'SECONDARY_NAMENODE': {"else": 1},
+ 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+ 'HISTORYSERVER': {31: 1, "else": 2},
+ 'RESOURCEMANAGER': {31: 1, "else": 2},
+
+ 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+ 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+ 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+ 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+ 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+ 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+ }
+
def getComponentLayoutValidations(self, services, hosts):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
items = []
@@ -561,26 +588,6 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
{"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}]
return self.toConfigurationValidationProblems(validationItems, "hbase-env")
-
- # TODO, move to Service Advisors.
- def getComponentLayoutSchemes(self):
- return {
- 'NAMENODE': {"else": 0},
- 'SECONDARY_NAMENODE': {"else": 1},
- 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
- 'HISTORYSERVER': {31: 1, "else": 2},
- 'RESOURCEMANAGER': {31: 1, "else": 2},
-
- 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
- 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
- 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
- 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
- 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
- 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
- }
-
def getHostsWithComponent(self, serviceName, componentName, services, hosts):
if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
index 26292d9..b72f046 100644
--- a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
@@ -33,27 +33,6 @@ def getSiteProperties(configurations, siteName):
return None
return siteConfig.get("properties")
-def getPort(address):
- """
- Extracts port from the address like 0.0.0.0:1019
- """
- if address is None:
- return None
- m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
- if m is not None:
- return int(m.group(2))
- else:
- return None
-
-def isSecurePort(port):
- """
- Returns True if port is root-owned at *nix systems
- """
- if port is not None:
- return port < 1024
- else:
- return False
-
class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
def __init__(self):
@@ -822,15 +801,15 @@ class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
data_transfer_protection = 'dfs.data.transfer.protection'
try: # Params may be absent
- privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address]))
+ privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address]))
except KeyError:
privileged_dfs_dn_port = False
try:
- privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address]))
+ privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address]))
except KeyError:
privileged_dfs_http_port = False
try:
- privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address]))
+ privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address]))
except KeyError:
privileged_dfs_https_port = False
try:
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py
index cc556c6..6a92934 100644
--- a/ambari-server/src/main/resources/stacks/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/stack_advisor.py
@@ -23,8 +23,11 @@ import os
import re
import socket
import traceback
+from math import ceil, floor
+from urlparse import urlparse
# Local imports
+from resource_management.libraries.functions.data_structure_utils import get_from_dict
from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
@@ -332,6 +335,7 @@ class DefaultStackAdvisor(StackAdvisor):
self.notValuableComponents = set()
self.notPreferableOnServerComponents = set()
self.cardinalitiesDict = {}
+ self.componentLayoutSchemes = {}
self.loaded_service_advisors = False
@@ -878,7 +882,99 @@ class DefaultStackAdvisor(StackAdvisor):
return items
def getConfigurationClusterSummary(self, servicesList, hosts, components, services):
- return {}
+ """
+ Copied from HDP 2.0.6 so that it could be used by Service Advisors.
+ :return: Dictionary of memory and CPU attributes in the cluster
+ """
+ hBaseInstalled = False
+ if 'HBASE' in servicesList:
+ hBaseInstalled = True
+
+ cluster = {
+ "cpu": 0,
+ "disk": 0,
+ "ram": 0,
+ "hBaseInstalled": hBaseInstalled,
+ "components": components
+ }
+
+ if len(hosts["items"]) > 0:
+ nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
+ # NodeManager host with least memory is generally used in calculations as it will work in larger hosts.
+ if nodeManagerHosts is not None and len(nodeManagerHosts) > 0:
+ nodeManagerHost = nodeManagerHosts[0];
+ for nmHost in nodeManagerHosts:
+ if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]:
+ nodeManagerHost = nmHost
+ host = nodeManagerHost["Hosts"]
+ cluster["referenceNodeManagerHost"] = host
+ else:
+ host = hosts["items"][0]["Hosts"]
+ cluster["referenceHost"] = host
+ cluster["cpu"] = host["cpu_count"]
+ cluster["disk"] = len(host["disk_info"])
+ cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
+
+ ramRecommendations = [
+ {"os":1, "hbase":1},
+ {"os":2, "hbase":1},
+ {"os":2, "hbase":2},
+ {"os":4, "hbase":4},
+ {"os":6, "hbase":8},
+ {"os":8, "hbase":8},
+ {"os":8, "hbase":8},
+ {"os":12, "hbase":16},
+ {"os":24, "hbase":24},
+ {"os":32, "hbase":32},
+ {"os":64, "hbase":32}
+ ]
+ index = {
+ cluster["ram"] <= 4: 0,
+ 4 < cluster["ram"] <= 8: 1,
+ 8 < cluster["ram"] <= 16: 2,
+ 16 < cluster["ram"] <= 24: 3,
+ 24 < cluster["ram"] <= 48: 4,
+ 48 < cluster["ram"] <= 64: 5,
+ 64 < cluster["ram"] <= 72: 6,
+ 72 < cluster["ram"] <= 96: 7,
+ 96 < cluster["ram"] <= 128: 8,
+ 128 < cluster["ram"] <= 256: 9,
+ 256 < cluster["ram"]: 10
+ }[1]
+
+
+ cluster["reservedRam"] = ramRecommendations[index]["os"]
+ cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
+
+
+ cluster["minContainerSize"] = {
+ cluster["ram"] <= 4: 256,
+ 4 < cluster["ram"] <= 8: 512,
+ 8 < cluster["ram"] <= 24: 1024,
+ 24 < cluster["ram"]: 2048
+ }[1]
+
+ totalAvailableRam = cluster["ram"] - cluster["reservedRam"]
+ if cluster["hBaseInstalled"]:
+ totalAvailableRam -= cluster["hbaseRam"]
+ cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
+ '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
+ cluster["containers"] = round(max(3,
+ min(2 * cluster["cpu"],
+ min(ceil(1.8 * cluster["disk"]),
+ cluster["totalAvailableRam"] / cluster["minContainerSize"]))))
+
+ '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers'''
+ cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"])
+ '''If greater than 1GB, value will be in multiples of 512.'''
+ if cluster["ramPerContainer"] > 1024:
+ cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512
+
+ cluster["mapMemory"] = int(cluster["ramPerContainer"])
+ cluster["reduceMemory"] = cluster["ramPerContainer"]
+ cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
+
+ return cluster
def validateClusterConfigurations(self, configurations, services, hosts):
validationItems = []
@@ -886,6 +982,12 @@ class DefaultStackAdvisor(StackAdvisor):
return self.toConfigurationValidationProblems(validationItems, "")
def toConfigurationValidationProblems(self, validationProblems, siteName):
+ """
+ Encapsulate the validation item's fields of "level" and "message" for the given validation's config-name.
+ :param validationProblems: List of validation problems
+ :param siteName: Config type
+ :return: List of configuration validation problems that include additional fields like the log level.
+ """
result = []
for validationProblem in validationProblems:
validationItem = validationProblem.get("item", None)
@@ -947,7 +1049,31 @@ class DefaultStackAdvisor(StackAdvisor):
self.validateMinMax(items, recommendedDefaults, configurations)
return items
+ def validateListOfConfigUsingMethod(self, configurations, recommendedDefaults, services, hosts, validators):
+ """
+ Service Advisors can use this method to pass in a list of validators, each of which is a tuple of a
+ a config type (string) and a function (pointer). Each validator is then executed.
+ :param validators: List of tuples like [("hadoop-env", someFunctionPointer), ("hdfs-site", someFunctionPointer)]
+ :return: List of validation errors
+ """
+ items = []
+ for (configType, method) in validators:
+ if configType in recommendedDefaults:
+ siteProperties = self.getSiteProperties(configurations, configType)
+ if siteProperties is not None:
+ siteRecommendations = recommendedDefaults[configType]["properties"]
+ print("SiteName: %s, method: %s" % (configType, method.__name__))
+ print("Site properties: %s" % str(siteProperties))
+ print("Recommendations: %s" % str(siteRecommendations))
+ validationItems = method(siteProperties, siteRecommendations, configurations, services, hosts)
+ items.extend(validationItems)
+ return items
+
def validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method):
+ """
+ Deprecated, please use validateListOfConfigUsingMethod
+ :return: List of validation errors by calling the corresponding method.
+ """
if siteName in recommendedDefaults:
siteProperties = self.getSiteProperties(configurations, siteName)
if siteProperties is not None:
@@ -1189,7 +1315,7 @@ class DefaultStackAdvisor(StackAdvisor):
The scheme dictionary basically maps the number of hosts to
host index where component should exist.
"""
- return {}
+ return self.componentLayoutSchemes
def getWarnItem(self, message):
"""
@@ -1672,3 +1798,524 @@ class DefaultStackAdvisor(StackAdvisor):
if recommendation:
put_f(name, ",".join(recommendation))
+
+ def getHostNamesWithComponent(self, serviceName, componentName, services):
+ """
+ Returns the list of hostnames on which service component is installed
+ """
+ if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
+ service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
+ components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+ if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+ componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
+ return componentHostnames
+ return []
+
+ def getHostsWithComponent(self, serviceName, componentName, services, hosts):
+ if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
+ service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
+ components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+ if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+ componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
+ componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames]
+ return componentHosts
+ return []
+
+ def getHostWithComponent(self, serviceName, componentName, services, hosts):
+ componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts)
+ if (len(componentHosts) > 0):
+ return componentHosts[0]
+ return None
+
+ def getHostComponentsByCategories(self, hostname, categories, services, hosts):
+ components = []
+ if services is not None and hosts is not None:
+ for service in services["services"]:
+ components.extend([componentEntry for componentEntry in service["components"]
+ if componentEntry["StackServiceComponents"]["component_category"] in categories
+ and hostname in componentEntry["StackServiceComponents"]["hostnames"]])
+ return components
+
+ def get_services_list(self, services):
+ """
+ Returns available services as list
+
+ :type services dict
+ :rtype list
+ """
+ if not services:
+ return []
+
+ return [service["StackServices"]["service_name"] for service in services["services"]]
+
+ def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
+ validationItems = []
+ users = self.getHadoopProxyUsers(services, hosts, configurations)
+ for user_name, user_properties in users.iteritems():
+ props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
+ if "propertyGroups" in user_properties:
+ props.append("hadoop.proxyuser.{0}.groups".format(user_name))
+
+ for prop in props:
+ validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
+
+ return validationItems
+
+ def getHadoopProxyUsers(self, services, hosts, configurations):
+ """
+ Gets Hadoop Proxy User recommendations based on the configuration that is provided by
+ getServiceHadoopProxyUsersConfigurationDict.
+
+ See getServiceHadoopProxyUsersConfigurationDict
+ """
+ servicesList = self.get_services_list(services)
+ users = {}
+
+ for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
+ users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations))
+
+ return users
+
+ def getServiceHadoopProxyUsersConfigurationDict(self):
+ """
+ Returns a map that is used by 'getHadoopProxyUsers' to determine service
+ user properties and related components and get proxyuser recommendations.
+ This method can be overridden in stackadvisors for the further stacks to
+ add additional services or change the previous logic.
+
+ Example of the map format:
+ {
+ "serviceName": [
+ ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"})
+ ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
+ ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
+ ],
+ "serviceName2": [
+ ...
+ }
+
+ If the third element of a tuple is map that maps proxy property to it's value.
+ The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional)
+ If the map value is a string, then this string will be used for the proxyuser
+ value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
+ Otherwise map value should be alist or a tuple with component names.
+ All hosts with the provided components will be added
+ to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3')
+
+ The forth element of the tuple is optional and if it's provided,
+ it should be a function that takes two arguments: services and hosts.
+ If it returns False, proxyusers for the tuple will not be added.
+ """
+ ALL_WILDCARD = "*"
+ HOSTS_PROPERTY = "propertyHosts"
+ GROUPS_PROPERTY = "propertyGroups"
+
+ return {
+ "HDFS": [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
+ "OOZIE": [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+ "HIVE": [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
+ ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+ "YARN": [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)],
+ "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
+ "SPARK": [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})]
+ }
+
+ def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations):
+ Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName))
+ servicesList = self.get_services_list(services)
+ resultUsers = {}
+
+ if serviceName in servicesList:
+ usersComponents = {}
+ for values in serviceUserComponents:
+
+ # Filter, if 4th argument is present in the tuple
+ filterFunction = values[3:]
+ if filterFunction and not filterFunction[0](services, hosts):
+ continue
+
+ userNameConfig, userNameProperty, hostSelectorMap = values[:3]
+ user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None)
+ if user:
+ usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap)
+
+ for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems():
+ proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty}
+ for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
+ componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*'
+ if isinstance(hostSelector, (list, tuple)):
+ _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values
+ for component in hostSelector:
+ componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts)
+ if componentHosts is not None:
+ for componentHost in componentHosts:
+ componentHostName = componentHost["Hosts"]["host_name"]
+ componentHostNames.add(componentHostName)
+
+ componentHostNamesString = ",".join(sorted(componentHostNames))
+ Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString))
+
+ if not proxyPropertyName in proxyUsers:
+ proxyUsers[proxyPropertyName] = componentHostNamesString
+
+ if not user in resultUsers:
+ resultUsers[user] = proxyUsers
+
+ return resultUsers
+
+ def recommendHadoopProxyUsers(self, configurations, services, hosts):
+ servicesList = self.get_services_list(services)
+
+ if 'forced-configurations' not in services:
+ services["forced-configurations"] = []
+
+ putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
+ putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
+
+ users = self.getHadoopProxyUsers(services, hosts, configurations)
+
+ # Force dependencies for HIVE
+ if "HIVE" in servicesList:
+ hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None)
+ if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None):
+ services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)})
+
+ for user_name, user_properties in users.iteritems():
+
+ # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users
+ self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty)
+ Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"]))
+ if "propertyGroups" in user_properties:
+ self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+
+ # Remove old properties if user was renamed
+ userOldValue = self.getOldValue(services, user_properties["config"], user_properties["propertyName"])
+ if userOldValue is not None and userOldValue != user_name:
+ putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true')
+ services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
+ services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)})
+
+ if "propertyGroups" in user_properties:
+ putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true')
+ services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
+ services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
+
+ self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
+
+ def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
+ if "HDFS" in servicesList:
+ ambari_user = self.getAmbariUser(services)
+ ambariHostName = socket.getfqdn()
+ self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+ self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+ old_ambari_user = self.getOldAmbariUser(services)
+ if old_ambari_user is not None:
+ putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+ putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+ def getAmbariUser(self, services):
+ ambari_user = services['ambari-server-properties']['ambari-server.user']
+ if "cluster-env" in services["configurations"] \
+ and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \
+ and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
+ and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
+ ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+ ambari_user = ambari_user.split('@')[0]
+ return ambari_user
+
+ def getOldAmbariUser(self, services):
+ ambari_user = None
+ if "cluster-env" in services["configurations"]:
+ if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
+ and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
+ ambari_user = services['ambari-server-properties']['ambari-server.user']
+ elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]:
+ ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+ ambari_user = ambari_user.split('@')[0]
+ return ambari_user
+
+ def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
+ is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
+ result_value = "*"
+ result_values_set = self.merge_proxyusers_values(current_value, value)
+ if len(result_values_set) > 0:
+ result_value = ",".join(sorted([val for val in result_values_set if val]))
+
+ if is_groups:
+ property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+ else:
+ property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+
+ put_function(property_name, result_value)
+
+ def get_data_for_proxyuser(self, user_name, services, configurations, groups=False):
+ """
+ Returns values of proxyuser properties for given user. Properties can be
+ hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
+ :param user_name:
+ :param services:
+ :param groups: if true, will return values for group property, not hosts
+ :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was *
+ """
+ if "core-site" in services["configurations"]:
+ coreSite = services["configurations"]["core-site"]['properties']
+ else:
+ coreSite = {}
+ if groups:
+ property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+ else:
+ property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+ if property_name in coreSite:
+ property_value = coreSite[property_name]
+ if property_value == "*":
+ return True, set()
+ else:
+ result_values = set()
+ if "core-site" in configurations:
+ if property_name in configurations["core-site"]['properties']:
+ result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(","))
+ result_values = result_values.union(property_value.split(","))
+ return False, result_values
+ return False, set()
+
+ def merge_proxyusers_values(self, first, second):
+ result = set()
+ def append(data):
+ if isinstance(data, str) or isinstance(data, unicode):
+ if data != "*":
+ result.update(data.split(","))
+ else:
+ result.update(data)
+ append(first)
+ append(second)
+ return result
+
+ def getOldValue(self, services, configType, propertyName):
+ if services:
+ if 'changed-configurations' in services.keys():
+ changedConfigs = services["changed-configurations"]
+ for changedConfig in changedConfigs:
+ if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig:
+ return changedConfig["old_value"]
+ return None
+
+ @classmethod
+ def isSecurePort(cls, port):
+ """
+ Returns True if port is root-owned at *nix systems
+ """
+ if port is not None:
+ return port < 1024
+ else:
+ return False
+
+ @classmethod
+ def getPort(cls, address):
+ """
+ Extracts port from the address like 0.0.0.0:1019
+ """
+ if address is None:
+ return None
+ m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
+ if m is not None:
+ return int(m.group(2))
+ else:
+ return None
+
+ def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
+ if propertyName not in recommendedDefaults:
+ # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
+ # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
+ return None
+
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set")
+ value = self.to_number(properties[propertyName])
+ if value is None:
+ return self.getErrorItem("Value should be integer")
+ defaultValue = self.to_number(recommendedDefaults[propertyName])
+ if defaultValue is None:
+ return None
+ if value < defaultValue:
+ return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue))
+ return None
+
+ def validatorEqualsPropertyItem(self, properties1, propertyName1,
+ properties2, propertyName2,
+ emptyAllowed=False):
+ if not propertyName1 in properties1:
+ return self.getErrorItem("Value should be set for %s" % propertyName1)
+ if not propertyName2 in properties2:
+ return self.getErrorItem("Value should be set for %s" % propertyName2)
+ value1 = properties1.get(propertyName1)
+ if value1 is None and not emptyAllowed:
+ return self.getErrorItem("Empty value for %s" % propertyName1)
+ value2 = properties2.get(propertyName2)
+ if value2 is None and not emptyAllowed:
+ return self.getErrorItem("Empty value for %s" % propertyName2)
+ if value1 != value2:
+ return self.getWarnItem("It is recommended to set equal values "
+ "for properties {0} and {1}".format(propertyName1, propertyName2))
+
+ return None
+
+ def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
+ propertyName):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set for %s" % propertyName)
+ value = properties.get(propertyName)
+ if not propertyName in recommendedDefaults:
+ return self.getErrorItem("Value should be recommended for %s" % propertyName)
+ recommendedValue = recommendedDefaults.get(propertyName)
+ if value != recommendedValue:
+ return self.getWarnItem("It is recommended to set value {0} "
+ "for property {1}".format(recommendedValue, propertyName))
+ return None
+
+ def validatorNotEmpty(self, properties, propertyName):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set for {0}".format(propertyName))
+ value = properties.get(propertyName)
+ if not value:
+ return self.getWarnItem("Empty value for {0}".format(propertyName))
+ return None
+
+ def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set")
+ dir = properties[propertyName]
+ if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName):
+ return None
+
+ dir = re.sub("^file://", "", dir, count=1)
+ mountPoints = []
+ for mountPoint in hostInfo["disk_info"]:
+ mountPoints.append(mountPoint["mountpoint"])
+ mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints)
+
+ if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint:
+ return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName))
+
+ return None
+
+ def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set")
+ dir = properties[propertyName]
+ if not dir.startswith("file://"):
+ return None
+
+ dir = re.sub("^file://", "", dir, count=1)
+ mountPoints = {}
+ for mountPoint in hostInfo["disk_info"]:
+ mountPoints[mountPoint["mountpoint"]] = self.to_number(mountPoint["available"])
+ mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints.keys())
+
+ if not mountPoints:
+ return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"])
+
+ if mountPoints[mountPoint] < reqiuredDiskSpace:
+ msg = "Ambari Metrics disk space requirements not met. \n" \
+ "Recommended disk space for partition {0} is {1}G"
+ return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb
+ return None
+
+ @classmethod
+ def is_valid_host_port_authority(cls, target):
+ has_scheme = "://" in target
+ if not has_scheme:
+ target = "dummyscheme://"+target
+ try:
+ result = urlparse(target)
+ if result.hostname is not None and result.port is not None:
+ return True
+ except ValueError:
+ pass
+ return False
+
+ @classmethod
+ def getMountPointForDir(cls, dir, mountPoints):
+ """
+ :param dir: Directory to check, even if it doesn't exist.
+ :return: Returns the closest mount point as a string for the directory.
+ if the "dir" variable is None, will return None.
+ If the directory does not exist, will return "/".
+ """
+ bestMountFound = None
+ if dir:
+ dir = re.sub("^file://", "", dir, count=1).strip().lower()
+
+ # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be
+ # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
+ # So take the one with the greatest number of segments.
+ for mountPoint in mountPoints:
+ # Ensure that the mount path and the dir path ends with "/"
+ # The mount point "/hadoop" should not match with the path "/hadoop1"
+ if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
+ if bestMountFound is None:
+ bestMountFound = mountPoint
+ elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep):
+ bestMountFound = mountPoint
+
+ return bestMountFound
+
+ def validateMinMemorySetting(self, properties, defaultValue, propertyName):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set")
+ if defaultValue is None:
+ return self.getErrorItem("Config's default value can't be null or undefined")
+
+ value = properties[propertyName]
+ if value is None:
+ return self.getErrorItem("Value can't be null or undefined")
+ try:
+ valueInt = self.to_number(value)
+ # TODO: generify for other use cases
+ defaultValueInt = int(str(defaultValue).strip())
+ if valueInt < defaultValueInt:
+ return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue))
+ except:
+ return None
+
+ return None
+
+ @classmethod
+ def checkXmxValueFormat(cls, value):
+ p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
+ matches = p.findall(value)
+ return len(matches) == 1
+
+ @classmethod
+ def getXmxSize(cls, value):
+ p = re.compile("-Xmx(\d+)(.?)")
+ result = p.findall(value)[0]
+ if len(result) > 1:
+ # result[1] - is a space or size formatter (b|k|m|g etc)
+ return result[0] + result[1].lower()
+ return result[0]
+
+ @classmethod
+ def formatXmxSizeToBytes(cls, value):
+ value = value.lower()
+ if len(value) == 0:
+ return 0
+ modifier = value[-1]
+
+ if modifier == ' ' or modifier in "0123456789":
+ modifier = 'b'
+ m = {
+ modifier == 'b': 1,
+ modifier == 'k': 1024,
+ modifier == 'm': 1024 * 1024,
+ modifier == 'g': 1024 * 1024 * 1024,
+ modifier == 't': 1024 * 1024 * 1024 * 1024,
+ modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
+ }[1]
+ return cls.to_number(value) * m
+
+ @classmethod
+ def to_number(cls, s):
+ try:
+ return int(re.sub("\D", "", s))
+ except ValueError:
+ return None
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/TestStackAdvisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestStackAdvisor.py b/ambari-server/src/test/python/TestStackAdvisor.py
index 87d2d15..d48def3 100644
--- a/ambari-server/src/test/python/TestStackAdvisor.py
+++ b/ambari-server/src/test/python/TestStackAdvisor.py
@@ -270,8 +270,24 @@ class TestStackAdvisorInitialization(TestCase):
}
hosts= {
"items": [
- {"Hosts": {"host_name": "host1"}},
- {"Hosts": {"host_name": "host2"}}
+ {"Hosts": {"host_name": "host1",
+ "cpu_count": 1,
+ "total_mem": 2097152,
+ "disk_info": [{
+ "size": '80000000',
+ "mountpoint": "/"
+ }]
+ }
+ },
+ {"Hosts": {"host_name": "host2",
+ "cpu_count": 1,
+ "total_mem": 2097152,
+ "disk_info": [{
+ "size": '80000000',
+ "mountpoint": "/"
+ }]
+ }
+ }
]
}
actualValidateConfigResponse = default_stack_advisor.validateConfigurations(services, hosts)
@@ -343,8 +359,14 @@ class TestStackAdvisorInitialization(TestCase):
# Test with maintenance_state. One host is in maintenance mode.
hosts= {
"items": [
- {"Hosts": {"host_name": "host1", "maintenance_state":"OFF"}},
- {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+ {"Hosts": {"host_name": "host1",
+ "maintenance_state":"OFF",
+ "cpu_count": 1}
+ },
+ {"Hosts": {"host_name": "host2",
+ "maintenance_state":"ON",
+ "cpu_count": 1}
+ }
]
}
@@ -397,8 +419,26 @@ class TestStackAdvisorInitialization(TestCase):
# Test with maintenance_state. Both hosts are in maintenance mode.
hosts= {
"items": [
- {"Hosts": {"host_name": "host1", "maintenance_state":"ON"}},
- {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+ {"Hosts": {"host_name": "host1",
+ "maintenance_state":"ON",
+ "cpu_count": 1,
+ "total_mem": 2097152,
+ "disk_info": [{
+ "size": '80000000',
+ "mountpoint": "/"
+ }]
+ }
+ },
+ {"Hosts": {"host_name": "host2",
+ "maintenance_state":"ON",
+ "cpu_count": 1,
+ "total_mem": 2097152,
+ "disk_info": [{
+ "size": '80000000',
+ "mountpoint": "/"
+ }]
+ }
+ }
]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
index 1145154..a486fb3 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
@@ -3253,11 +3253,11 @@ class TestHDP206StackAdvisor(TestCase):
self.assertEquals(self.stack_advisor_impl.round_to_n(4097), 4096)
def test_getMountPointForDir(self):
- self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/"]), "/")
- self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/var", "/"]), "/var")
- self.assertEquals(self.stack_advisor_impl.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var")
- self.assertEquals(self.stack_advisor_impl.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None)
- self.assertEquals(self.stack_advisor_impl.getMountPointForDir("relative/path", ["/var", "/"]), None)
+ self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/"]), "/")
+ self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/var", "/"]), "/var")
+ self.assertEquals(self.stackAdvisor.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var")
+ self.assertEquals(self.stackAdvisor.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None)
+ self.assertEquals(self.stackAdvisor.getMountPointForDir("relative/path", ["/var", "/"]), None)
def test_parseCardinality(self):
self.assertEquals(self.stackAdvisor.parseCardinality("ALL", 5), (5, 5))
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
index f9fb1f5..e8bd5d0 100644
--- a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
@@ -168,11 +168,8 @@ class TestHDP21StackAdvisor(TestCase):
if len(components) > 0:
groups.append(components)
- def sort_nested_lists(list):
- result_list = []
- for sublist in list:
- result_list.append(sorted(sublist))
- return sorted(result_list)
+ def sort_nested_lists(l):
+ return sorted(reduce(lambda x,y: x+y, l))
self.assertEquals(sort_nested_lists(expected_layout), sort_nested_lists(groups))
[2/2] ambari git commit: AMBARI-19097. HDP 3.0 TP - create Service
Advisor for HDFS (alejandro)
Posted by al...@apache.org.
AMBARI-19097. HDP 3.0 TP - create Service Advisor for HDFS (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/326cc1b2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/326cc1b2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/326cc1b2
Branch: refs/heads/trunk
Commit: 326cc1b2a1e05073050a38ea104d6d63ed76f373
Parents: 5bdd6cf
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Jan 10 12:55:50 2017 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Mon Jan 16 17:47:16 2017 -0800
----------------------------------------------------------------------
.../HDFS/3.0.0.3.0/service_advisor.py | 602 +++++++++++++++++
.../ZOOKEEPER/3.4.9/service_advisor.py | 49 +-
.../stacks/BIGTOP/0.8/services/stack_advisor.py | 63 +-
.../stacks/HDP/2.0.6/services/stack_advisor.py | 553 ++--------------
.../stacks/HDP/2.1/services/stack_advisor.py | 31 +-
.../stacks/HDP/2.2/services/stack_advisor.py | 39 +-
.../stacks/HDPWIN/2.1/services/stack_advisor.py | 47 +-
.../stacks/HDPWIN/2.2/services/stack_advisor.py | 27 +-
.../src/main/resources/stacks/stack_advisor.py | 651 ++++++++++++++++++-
.../src/test/python/TestStackAdvisor.py | 52 +-
.../stacks/2.0.6/common/test_stack_advisor.py | 10 +-
.../stacks/2.1/common/test_stack_advisor.py | 7 +-
12 files changed, 1486 insertions(+), 645 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
new file mode 100644
index 0000000..eb7f35c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
@@ -0,0 +1,602 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+import inspect
+import re
+import socket
+from urlparse import urlparse
+
+# Local imports
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.data_structure_utils import get_from_dict
+from resource_management.libraries.functions.mounted_dirs_helper import get_mounts_with_multiple_data_dirs
+
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+ with open(PARENT_FILE, 'rb') as fp:
+ service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+ traceback.print_exc()
+ print "Failed to load parent"
+
+
+class HDFSServiceAdvisor(service_advisor.ServiceAdvisor):
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(HDFSServiceAdvisor, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ # Always call these methods
+ self.modifyMastersWithMultipleInstances()
+ self.modifyCardinalitiesDict()
+ self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
+
+ def modifyMastersWithMultipleInstances(self):
+ """
+ Modify the set of masters with multiple instances.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyCardinalitiesDict(self):
+ """
+ Modify the dictionary of cardinalities.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyHeapSizeProperties(self):
+ """
+ Modify the dictionary of heap size properties.
+ Must be overriden in child class.
+ """
+ self.heap_size_properties = {"NAMENODE":
+ [{"config-name": "hadoop-env",
+ "property": "namenode_heapsize",
+ "default": "1024m"}],
+ "SECONDARY_NAMENODE":
+ [{"config-name": "hadoop-env",
+ "property": "namenode_heapsize",
+ "default": "1024m"}],
+ "DATANODE":
+ [{"config-name": "hadoop-env",
+ "property": "dtnode_heapsize",
+ "default": "1024m"}]}
+
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ self.notValuableComponents |= set(['JOURNALNODE', 'ZKFC'])
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'NAMENODE': {"else": 0},
+ 'SECONDARY_NAMENODE': {"else": 1}
+ })
+
+ def getServiceComponentLayoutValidations(self, services, hosts):
+ """
+ Get a list of errors.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ # HDFS allows NameNode and Secondary NameNode to be on the same host.
+ return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+ def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+ """
+ Entry point.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ # Due to the existing stack inheritance, make it clear where each calculation came from.
+ recommender = HDFSRecommender()
+ recommender.recommendConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+ recommender.recommendConfigurationsFromHDP23(configurations, clusterData, services, hosts)
+
+ def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+ """
+ Entry point.
+ Validate configurations for the service. Return a list of errors.
+ The code for this function should be the same for each Service Advisor.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ validator = HDFSValidator()
+ # Calls the methods of the validator using arguments,
+ # method(siteProperties, siteRecommendations, configurations, services, hosts)
+ return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+class HDFSRecommender(service_advisor.ServiceAdvisor):
+ """
+ HDFS Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(HDFSRecommender, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ def recommendConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
+ """
+ Recommend configurations for this service based on HDP 2.0.6.
+ """
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+ putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services)
+ putHDFSSitePropertyAttributes = self.putPropertyAttribute(configurations, "hdfs-site")
+
+ totalAvailableRam = clusterData['totalAvailableRam']
+ Logger.info("Class: %s, Method: %s. Total Available Ram: %s" % (self.__class__.__name__, inspect.stack()[0][3], str(totalAvailableRam)))
+ putHDFSProperty('namenode_heapsize', max(int(totalAvailableRam / 2), 1024))
+ putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+ putHDFSProperty('namenode_opt_newsize', max(int(totalAvailableRam / 8), 128))
+ putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+ putHDFSProperty('namenode_opt_maxnewsize', max(int(totalAvailableRam / 8), 256))
+
+ # Check if NN HA is enabled and recommend removing dfs.namenode.rpc-address
+ hdfsSiteProperties = self.getServicesSiteProperties(services, "hdfs-site")
+ nameServices = None
+ if hdfsSiteProperties and 'dfs.internal.nameservices' in hdfsSiteProperties:
+ nameServices = hdfsSiteProperties['dfs.internal.nameservices']
+ if nameServices is None and hdfsSiteProperties and 'dfs.nameservices' in hdfsSiteProperties:
+ nameServices = hdfsSiteProperties['dfs.nameservices']
+ if nameServices and "dfs.ha.namenodes.%s" % nameServices in hdfsSiteProperties:
+ namenodes = hdfsSiteProperties["dfs.ha.namenodes.%s" % nameServices]
+ if len(namenodes.split(',')) > 1:
+ putHDFSSitePropertyAttributes("dfs.namenode.rpc-address", "delete", "true")
+
+ Logger.info("Class: %s, Method: %s. HDFS nameservices: %s" %
+ (self.__class__.__name__, inspect.stack()[0][3], str(nameServices)))
+
+ hdfs_mount_properties = [
+ ("dfs.datanode.data.dir", "DATANODE", "/hadoop/hdfs/data", "multi"),
+ ("dfs.namenode.name.dir", "DATANODE", "/hadoop/hdfs/namenode", "multi"),
+ ("dfs.namenode.checkpoint.dir", "SECONDARY_NAMENODE", "/hadoop/hdfs/namesecondary", "single")
+ ]
+
+ Logger.info("Class: %s, Method: %s. Updating HDFS mount properties." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+ self.updateMountProperties("hdfs-site", hdfs_mount_properties, configurations, services, hosts)
+
+ dataDirs = []
+ if configurations and "hdfs-site" in configurations and \
+ "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"] and \
+ configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"] is not None:
+ dataDirs = configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"].split(",")
+
+ elif hdfsSiteProperties and "dfs.datanode.data.dir" in hdfsSiteProperties and \
+ hdfsSiteProperties["dfs.datanode.data.dir"] is not None:
+ dataDirs = hdfsSiteProperties["dfs.datanode.data.dir"].split(",")
+
+ Logger.info("Class: %s, Method: %s. HDFS Data Dirs: %s" %
+ (self.__class__.__name__, inspect.stack()[0][3], str(dataDirs)))
+
+ # dfs.datanode.du.reserved should be set to 10-15% of volume size
+ # For each host selects maximum size of the volume. Then gets minimum for all hosts.
+ # This ensures that each host will have at least one data dir with available space.
+ reservedSizeRecommendation = 0l #kBytes
+ for host in hosts["items"]:
+ mountPoints = []
+ mountPointDiskAvailableSpace = [] #kBytes
+ for diskInfo in host["Hosts"]["disk_info"]:
+ mountPoints.append(diskInfo["mountpoint"])
+ mountPointDiskAvailableSpace.append(long(diskInfo["size"]))
+
+ maxFreeVolumeSizeForHost = 0l #kBytes
+ for dataDir in dataDirs:
+ mp = HDFSRecommender.getMountPointForDir(dataDir, mountPoints)
+ for i in range(len(mountPoints)):
+ if mp == mountPoints[i]:
+ if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost:
+ maxFreeVolumeSizeForHost = mountPointDiskAvailableSpace[i]
+
+ if (not reservedSizeRecommendation) or (maxFreeVolumeSizeForHost and maxFreeVolumeSizeForHost < reservedSizeRecommendation):
+ reservedSizeRecommendation = maxFreeVolumeSizeForHost
+
+ Logger.info("Class: %s, Method: %s. HDFS Datanode recommended reserved size: %d" %
+ (self.__class__.__name__, inspect.stack()[0][3], reservedSizeRecommendation))
+
+ if reservedSizeRecommendation:
+ reservedSizeRecommendation = max(reservedSizeRecommendation * 1024 / 8, 1073741824) # At least 1Gb is reserved
+ putHDFSSiteProperty('dfs.datanode.du.reserved', reservedSizeRecommendation) #Bytes
+
+ # recommendations for "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" properties in core-site
+ self.recommendHadoopProxyUsers(configurations, services, hosts)
+
+ def recommendConfigurationsFromHDP23(self, configurations, clusterData, services, hosts):
+ """
+ Recommend configurations for this service based on HDP 2.3.
+ """
+ putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services)
+ putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site")
+
+ if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']):
+ rangerPluginEnabled = ''
+ if 'ranger-hdfs-plugin-properties' in configurations and 'ranger-hdfs-plugin-enabled' in configurations['ranger-hdfs-plugin-properties']['properties']:
+ rangerPluginEnabled = configurations['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled']
+ elif 'ranger-hdfs-plugin-properties' in services['configurations'] and 'ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']:
+ rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled']
+
+ if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()):
+ putHdfsSiteProperty("dfs.namenode.inode.attributes.provider.class",'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer')
+ else:
+ putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
+ else:
+ putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
+
+
+class HDFSValidator(service_advisor.ServiceAdvisor):
+ """
+ HDFS Validator checks the correctness of properties whenever the service is first added or the user attempts to
+ change configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(HDFSValidator, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ self.validators = [("hdfs-site", self.validateHDFSConfigurationsFromHDP206),
+ ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP206),
+ ("core-site", self.validateHDFSCoreSiteFromHDP206),
+ ("hdfs-site", self.validateHDFSConfigurationsFromHDP22),
+ ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP22),
+ ("ranger-hdfs-plugin-properties", self.validateHDFSRangerPluginConfigurationsFromHDP22),
+ ("hdfs-site", self.validateRangerAuthorizerFromHDP23)]
+
+ # **********************************************************
+ # Example of how to add a function that validates a certain config type.
+ # If the same config type has multiple functions, can keep adding tuples to self.validators
+ #self.validators.append(("hadoop-env", self.sampleValidator))
+
+ def sampleValidator(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ Example of a validator function other other Service Advisors to emulate.
+ :return: A list of configuration validation problems.
+ """
+ validationItems = []
+
+ '''
+ Item is a simple dictionary.
+ Two functions can be used to construct it depending on the log level: WARN|ERROR
+ E.g.,
+ self.getErrorItem(message) or self.getWarnItem(message)
+
+ item = {"level": "ERROR|WARN", "message": "value"}
+ '''
+ validationItems.append({"config-name": "my_config_property_name",
+ "item": self.getErrorItem("My custom message in method %s" % inspect.stack()[0][3])})
+ return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+ def validateHDFSConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.0.6; validate hdfs-site
+ :return: A list of configuration validation problems.
+ """
+ clusterEnv = self.getSiteProperties(configurations, "cluster-env")
+ validationItems = [{"config-name": 'dfs.datanode.du.reserved', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'dfs.datanode.du.reserved')},
+ {"config-name": 'dfs.datanode.data.dir', "item": self.validatorOneDataDirPerPartition(properties, 'dfs.datanode.data.dir', services, hosts, clusterEnv)}]
+ return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+ def validatorOneDataDirPerPartition(self, properties, propertyName, services, hosts, clusterEnv):
+ if not propertyName in properties:
+ return self.getErrorItem("Value should be set")
+ dirs = properties[propertyName]
+
+ if not (clusterEnv and "one_dir_per_partition" in clusterEnv and clusterEnv["one_dir_per_partition"].lower() == "true"):
+ return None
+
+ dataNodeHosts = self.getDataNodeHosts(services, hosts)
+
+ warnings = set()
+ for host in dataNodeHosts:
+ hostName = host["Hosts"]["host_name"]
+
+ mountPoints = []
+ for diskInfo in host["Hosts"]["disk_info"]:
+ mountPoints.append(diskInfo["mountpoint"])
+
+ if get_mounts_with_multiple_data_dirs(mountPoints, dirs):
+ # A detailed message can be too long on large clusters:
+ # warnings.append("Host: " + hostName + "; Mount: " + mountPoint + "; Data directories: " + ", ".join(dirList))
+ warnings.add(hostName)
+ break
+
+ if len(warnings) > 0:
+ return self.getWarnItem("cluster-env/one_dir_per_partition is enabled but there are multiple data directories on the same mount. Affected hosts: {0}".format(", ".join(sorted(warnings))))
+
+ return None
+
+ def validateHadoopEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.0.6; validate hadoop-env
+ :return: A list of configuration validation problems.
+ """
+ validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')},
+ {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')},
+ {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}]
+ return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+ def validateHDFSCoreSiteFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.0.6; validate core-site
+ :return: A list of configuration validation problems.
+ """
+ validationItems = []
+ validationItems.extend(self.getHadoopProxyUsersValidationItems(properties, services, hosts, configurations))
+ validationItems.extend(self.getAmbariProxyUsersForHDFSValidationItems(properties, services))
+ return self.toConfigurationValidationProblems(validationItems, "core-site")
+
+ def getAmbariProxyUsersForHDFSValidationItems(self, properties, services):
+ validationItems = []
+ servicesList = self.get_services_list(services)
+
+ if "HDFS" in servicesList:
+ ambari_user = self.getAmbariUser(services)
+ props = (
+ "hadoop.proxyuser.{0}.hosts".format(ambari_user),
+ "hadoop.proxyuser.{0}.groups".format(ambari_user)
+ )
+ for prop in props:
+ validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
+
+ return validationItems
+
+ def validateHDFSConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.2; validate hdfs-site
+ :return: A list of configuration validation problems.
+ """
+ # We can not access property hadoop.security.authentication from the
+ # other config (core-site). That's why we are using another heuristic here
+ hdfs_site = properties
+ core_site = self.getSiteProperties(configurations, "core-site")
+
+ dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer' # Hadoop Wire encryption
+ wire_encryption_enabled = False
+ try:
+ wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true"
+ except KeyError:
+ pass
+
+ HTTP_ONLY = 'HTTP_ONLY'
+ HTTPS_ONLY = 'HTTPS_ONLY'
+ HTTP_AND_HTTPS = 'HTTP_AND_HTTPS'
+
+ VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS]
+ VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy']
+
+ validationItems = []
+ address_properties = [
+ # "dfs.datanode.address",
+ # "dfs.datanode.http.address",
+ # "dfs.datanode.https.address",
+ # "dfs.datanode.ipc.address",
+ # "dfs.journalnode.http-address",
+ # "dfs.journalnode.https-address",
+ # "dfs.namenode.rpc-address",
+ # "dfs.namenode.secondary.http-address",
+ "dfs.namenode.http-address",
+ "dfs.namenode.https-address",
+ ]
+ #Validating *address properties for correct values
+
+ for address_property in address_properties:
+ if address_property in hdfs_site:
+ value = hdfs_site[address_property]
+ if not HDFSValidator.is_valid_host_port_authority(value):
+ validationItems.append({"config-name" : address_property, "item" :
+ self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)})
+
+ #Adding Ranger Plugin logic here
+ ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+ ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
+ if 'dfs.permissions.enabled' in hdfs_site and \
+ hdfs_site['dfs.permissions.enabled'] != 'true':
+ validationItems.append({"config-name": 'dfs.permissions.enabled',
+ "item": self.getWarnItem("dfs.permissions.enabled needs to be set to true if Ranger HDFS Plugin is enabled.")})
+
+ if (not wire_encryption_enabled and # If wire encryption is enabled at Hadoop, it disables all our checks
+ 'hadoop.security.authentication' in core_site and
+ core_site['hadoop.security.authentication'] == 'kerberos' and
+ 'hadoop.security.authorization' in core_site and
+ core_site['hadoop.security.authorization'] == 'true'):
+ # security is enabled
+
+ dfs_http_policy = 'dfs.http.policy'
+ dfs_datanode_address = 'dfs.datanode.address'
+ datanode_http_address = 'dfs.datanode.http.address'
+ datanode_https_address = 'dfs.datanode.https.address'
+ data_transfer_protection = 'dfs.data.transfer.protection'
+
+ try: # Params may be absent
+ privileged_dfs_dn_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[dfs_datanode_address]))
+ except KeyError:
+ privileged_dfs_dn_port = False
+ try:
+ privileged_dfs_http_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_http_address]))
+ except KeyError:
+ privileged_dfs_http_port = False
+ try:
+ privileged_dfs_https_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_https_address]))
+ except KeyError:
+ privileged_dfs_https_port = False
+ try:
+ dfs_http_policy_value = hdfs_site[dfs_http_policy]
+ except KeyError:
+ dfs_http_policy_value = HTTP_ONLY # Default
+ try:
+ data_transfer_protection_value = hdfs_site[data_transfer_protection]
+ except KeyError:
+ data_transfer_protection_value = None
+
+ if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES:
+ validationItems.append({"config-name": dfs_http_policy,
+ "item": self.getWarnItem(
+ "Invalid property value: {0}. Valid values are {1}".format(
+ dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))})
+
+ # determine whether we use secure ports
+ address_properties_with_warnings = []
+ if dfs_http_policy_value == HTTPS_ONLY:
+ if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site):
+ important_properties = [dfs_datanode_address, datanode_https_address]
+ message = "You set up datanode to use some non-secure ports. " \
+ "If you want to run Datanode under non-root user in a secure cluster, " \
+ "you should set all these properties {2} " \
+ "to use non-secure ports (if property {3} does not exist, " \
+ "just add it). You may also set up property {4} ('{5}' is a good default value). " \
+ "Also, set up WebHDFS with SSL as " \
+ "described in manual in order to be able to " \
+ "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties,
+ datanode_https_address, data_transfer_protection,
+ VALID_TRANSFER_PROTECTION_VALUES[0])
+ address_properties_with_warnings.extend(important_properties)
+ else: # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY
+ # We don't enforce datanode_https_address to use privileged ports here
+ any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port
+ if any_nonprivileged_ports_are_in_use:
+ important_properties = [dfs_datanode_address, datanode_http_address]
+ message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \
+ "In a secure cluster, Datanode forbids using non-secure ports " \
+ "if {0} is not set to {3}. " \
+ "Please make sure that properties {2} use secure ports.".format(
+ dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY)
+ address_properties_with_warnings.extend(important_properties)
+
+ # Generate port-related warnings if any
+ for prop in address_properties_with_warnings:
+ validationItems.append({"config-name": prop,
+ "item": self.getWarnItem(message)})
+
+ # Check if it is appropriate to use dfs.data.transfer.protection
+ if data_transfer_protection_value is not None:
+ if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]:
+ validationItems.append({"config-name": data_transfer_protection,
+ "item": self.getWarnItem(
+ "{0} property can not be used when {1} is set to any "
+ "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format(
+ data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))})
+ elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES:
+ validationItems.append({"config-name": data_transfer_protection,
+ "item": self.getWarnItem(
+ "Invalid property value: {0}. Valid values are {1}.".format(
+ data_transfer_protection_value, VALID_TRANSFER_PROTECTION_VALUES))})
+ return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+ def validateHadoopEnvConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.2; validate hadoop-env
+ :return: A list of configuration validation problems.
+ """
+ validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')},
+ {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')},
+ {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}]
+ return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+ def validateHDFSRangerPluginConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.2; validate ranger-hdfs-plugin-properties
+ :return: A list of configuration validation problems.
+ """
+ validationItems = []
+ ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+ ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+ if ranger_plugin_enabled.lower() == 'yes':
+ # ranger-hdfs-plugin must be enabled in ranger-env
+ ranger_env = self.getServicesSiteProperties(services, 'ranger-env')
+ if not ranger_env or not 'ranger-hdfs-plugin-enabled' in ranger_env or ranger_env['ranger-hdfs-plugin-enabled'].lower() != 'yes':
+ validationItems.append({"config-name": 'ranger-hdfs-plugin-enabled',
+ "item": self.getWarnItem(
+ "ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled must correspond ranger-env/ranger-hdfs-plugin-enabled")})
+ return self.toConfigurationValidationProblems(validationItems, "ranger-hdfs-plugin-properties")
+
+ def validateRangerAuthorizerFromHDP23(self, properties, recommendedDefaults, configurations, services, hosts):
+ """
+ This was copied from HDP 2.3
+ If Ranger service is present and the ranger plugin is enabled, check that the provider class is correctly set.
+ :return: A list of configuration validation problems.
+ """
+ Logger.info("Class: %s, Method: %s. Checking if Ranger service is present and if the provider class is using the Ranger Authorizer." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+ # We can not access property hadoop.security.authentication from the
+ # other config (core-site). That's why we are using another heuristics here
+ hdfs_site = properties
+ validationItems = [] #Adding Ranger Plugin logic here
+ ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+ ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+ servicesList = self.getServiceNames(services)
+ if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'yes'):
+
+ try:
+ if hdfs_site['dfs.namenode.inode.attributes.provider.class'].lower() != 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer'.lower():
+ raise ValueError()
+ except (KeyError, ValueError), e:
+ message = "dfs.namenode.inode.attributes.provider.class needs to be set to 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer' if Ranger HDFS Plugin is enabled."
+ validationItems.append({"config-name": 'dfs.namenode.inode.attributes.provider.class',
+ "item": self.getWarnItem(message)})
+
+ return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+ def getDataNodeHosts(self, services, hosts):
+ """
+ Returns the list of Data Node hosts. If none, return an empty list.
+ """
+ if len(hosts["items"]) > 0:
+ dataNodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts)
+ if dataNodeHosts is not None:
+ return dataNodeHosts
+ return []
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
index 82316f4..4174b9c 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
@@ -48,6 +48,9 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
self.modifyMastersWithMultipleInstances()
self.modifyCardinalitiesDict()
self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -72,22 +75,46 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
"property": "zk_server_heapsize",
"default": "1024m"}]}
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
def getServiceComponentLayoutValidations(self, services, hosts):
"""
Get a list of errors. Zookeeper does not have any validations in this version.
"""
- service_name = services["services"][0]["StackServices"]["service_name"]
- Logger.info("Class: %s, Method: %s. Validating Service Component Layout for Service: %s." %
- (self.__class__.__name__, inspect.stack()[0][3], service_name))
+ Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
return self.as_super.getServiceComponentLayoutValidations(services, hosts)
def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
"""
Recommend configurations to set. Zookeeper does not have any recommendations in this version.
"""
- service_name = services["services"][0]["StackServices"]["service_name"]
- Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." %
- (self.__class__.__name__, inspect.stack()[0][3], service_name))
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
self.recommendConfigurations(configurations, clusterData, services, hosts)
@@ -95,9 +122,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
"""
Recommend configurations for this service.
"""
- service_name = services["services"][0]["StackServices"]["service_name"]
- Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." %
- (self.__class__.__name__, inspect.stack()[0][3], service_name))
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
Logger.info("Setting zoo.cfg to default dataDir to /hadoop/zookeeper on the best matching mount")
@@ -110,9 +136,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
"""
Validate configurations for the service. Return a list of errors.
"""
- service_name = services["services"][0]["StackServices"]["service_name"]
- Logger.info("Class: %s, Method: %s. Validating Configurations for Service: %s." %
- (self.__class__.__name__, inspect.stack()[0][3], service_name))
+ Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
items = []
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
index 5172042..6ef74d2 100644
--- a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
@@ -37,6 +37,7 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -79,6 +80,28 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
"""
self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER'])
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes = {
+ 'NAMENODE': {"else": 0},
+ 'SECONDARY_NAMENODE': {"else": 1},
+ 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+ 'HISTORYSERVER': {31: 1, "else": 2},
+ 'RESOURCEMANAGER': {31: 1, "else": 2},
+
+ 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+ 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+ 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+ 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+ }
+
def getComponentLayoutValidations(self, services, hosts):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
items = []
@@ -330,23 +353,6 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
{"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ]
return self.toConfigurationValidationProblems(validationItems, "yarn-site")
- # TODO, move to Service Advisors.
- def getComponentLayoutSchemes(self):
- return {
- 'NAMENODE': {"else": 0},
- 'SECONDARY_NAMENODE': {"else": 1},
- 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
- 'HISTORYSERVER': {31: 1, "else": 2},
- 'RESOURCEMANAGER': {31: 1, "else": 2},
-
- 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
- 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
- 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
- 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
- }
-
class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
def __init__(self):
@@ -357,6 +363,7 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -396,6 +403,18 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
"""
self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS'])
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+ 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+ })
+
def getServiceConfigurationRecommenderDict(self):
parentRecommendConfDict = super(BIGTOP08StackAdvisor, self).getServiceConfigurationRecommenderDict()
childRecommendConfDict = {
@@ -430,16 +449,6 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
"-server -Xmx" + str(int(0.8 * clusterData["amMemory"]))
+ "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC")
- # TODO, move to Service Advisors.
- def getComponentLayoutSchemes(self):
- parentSchemes = super(BIGTOP08StackAdvisor, self).getComponentLayoutSchemes()
- childSchemes = {
- 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
- 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
- }
- parentSchemes.update(childSchemes)
- return parentSchemes
-
def getServiceConfigurationValidators(self):
parentValidators = super(BIGTOP08StackAdvisor, self).getServiceConfigurationValidators()
childValidators = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 9816702..3596798 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -42,6 +42,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -84,6 +85,29 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"""
self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER', 'METRICS_COLLECTOR'])
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'NAMENODE': {"else": 0},
+ 'SECONDARY_NAMENODE': {"else": 1},
+ 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+ 'HISTORYSERVER': {31: 1, "else": 2},
+ 'RESOURCEMANAGER': {31: 1, "else": 2},
+
+ 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+ 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+ 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+ 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+ 'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5},
+ })
+
def getComponentLayoutValidations(self, services, hosts):
"""Returns array of Validation objects about issues with hostnames components assigned to"""
items = super(HDP206StackAdvisor, self).getComponentLayoutValidations(services, hosts)
@@ -248,17 +272,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
ambari_user = ambari_user.split('@')[0]
return ambari_user
- def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
- if "HDFS" in servicesList:
- ambari_user = self.getAmbariUser(services)
- ambariHostName = socket.getfqdn()
- self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty)
- self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
- old_ambari_user = self.getOldAmbariUser(services)
- if old_ambari_user is not None:
- putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
- putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
-
def getAmbariProxyUsersForHDFSValidationItems(self, properties, services):
validationItems = []
servicesList = self.get_services_list(services)
@@ -274,216 +287,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return validationItems
- def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations):
- Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName))
- servicesList = self.get_services_list(services)
- resultUsers = {}
-
- if serviceName in servicesList:
- usersComponents = {}
- for values in serviceUserComponents:
-
- # Filter, if 4th argument is present in the tuple
- filterFunction = values[3:]
- if filterFunction and not filterFunction[0](services, hosts):
- continue
-
- userNameConfig, userNameProperty, hostSelectorMap = values[:3]
- user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None)
- if user:
- usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap)
-
- for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems():
- proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty}
- for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
- componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*'
- if isinstance(hostSelector, (list, tuple)):
- _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values
- for component in hostSelector:
- componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts)
- if componentHosts is not None:
- for componentHost in componentHosts:
- componentHostName = componentHost["Hosts"]["host_name"]
- componentHostNames.add(componentHostName)
-
- componentHostNamesString = ",".join(sorted(componentHostNames))
- Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString))
-
- if not proxyPropertyName in proxyUsers:
- proxyUsers[proxyPropertyName] = componentHostNamesString
-
- if not user in resultUsers:
- resultUsers[user] = proxyUsers
-
- return resultUsers
-
- def getHadoopProxyUsers(self, services, hosts, configurations):
- """
- Gets Hadoop Proxy User recommendations based on the configuration that is provided by
- getServiceHadoopProxyUsersConfigurationDict.
-
- See getServiceHadoopProxyUsersConfigurationDict
- """
- servicesList = self.get_services_list(services)
- users = {}
-
- for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
- users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations))
-
- return users
-
- def get_data_for_proxyuser(self, user_name, services, configurations, groups=False):
- """
- Returns values of proxyuser properties for given user. Properties can be
- hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
- :param user_name:
- :param services:
- :param groups: if true, will return values for group property, not hosts
- :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was *
- """
- if "core-site" in services["configurations"]:
- coreSite = services["configurations"]["core-site"]['properties']
- else:
- coreSite = {}
- if groups:
- property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
- else:
- property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
- if property_name in coreSite:
- property_value = coreSite[property_name]
- if property_value == "*":
- return True, set()
- else:
- result_values = set()
- if "core-site" in configurations:
- if property_name in configurations["core-site"]['properties']:
- result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(","))
- result_values = result_values.union(property_value.split(","))
- return False, result_values
- return False, set()
-
- def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
- is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
- result_value = "*"
- result_values_set = self.merge_proxyusers_values(current_value, value)
- if len(result_values_set) > 0:
- result_value = ",".join(sorted([val for val in result_values_set if val]))
-
- if is_groups:
- property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
- else:
- property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
-
- put_function(property_name, result_value)
-
- def merge_proxyusers_values(self, first, second):
- result = set()
- def append(data):
- if isinstance(data, str) or isinstance(data, unicode):
- if data != "*":
- result.update(data.split(","))
- else:
- result.update(data)
- append(first)
- append(second)
- return result
-
- def getServiceHadoopProxyUsersConfigurationDict(self):
- """
- Returns a map that is used by 'getHadoopProxyUsers' to determine service
- user properties and related components and get proxyuser recommendations.
- This method can be overridden in stackadvisors for the further stacks to
- add additional services or change the previous logic.
-
- Example of the map format:
- {
- "serviceName": [
- ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"})
- ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
- ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
- ],
- "serviceName2": [
- ...
- }
-
- If the third element of a tuple is map that maps proxy property to it's value.
- The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional)
- If the map value is a string, then this string will be used for the proxyuser
- value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
- Otherwise map value should be alist or a tuple with component names.
- All hosts with the provided components will be added
- to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3')
-
- The forth element of the tuple is optional and if it's provided,
- it should be a function that takes two arguments: services and hosts.
- If it returns False, proxyusers for the tuple will not be added.
- """
- ALL_WILDCARD = "*"
- HOSTS_PROPERTY = "propertyHosts"
- GROUPS_PROPERTY = "propertyGroups"
-
- return {
- "HDFS": [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
- "OOZIE": [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
- "HIVE": [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
- ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
- "YARN": [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)],
- "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
- "SPARK": [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})]
- }
-
- def recommendHadoopProxyUsers(self, configurations, services, hosts):
- servicesList = self.get_services_list(services)
-
- if 'forced-configurations' not in services:
- services["forced-configurations"] = []
-
- putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
- putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
-
- users = self.getHadoopProxyUsers(services, hosts, configurations)
-
- # Force dependencies for HIVE
- if "HIVE" in servicesList:
- hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None)
- if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None):
- services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)})
-
- for user_name, user_properties in users.iteritems():
-
- # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users
- self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty)
- Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"]))
- if "propertyGroups" in user_properties:
- self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
-
- # Remove old properties if user was renamed
- userOldValue = getOldValue(self, services, user_properties["config"], user_properties["propertyName"])
- if userOldValue is not None and userOldValue != user_name:
- putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true')
- services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
- services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)})
-
- if "propertyGroups" in user_properties:
- putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true')
- services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
- services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
-
- self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
-
- def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
- validationItems = []
- users = self.getHadoopProxyUsers(services, hosts, configurations)
- for user_name, user_properties in users.iteritems():
- props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
- if "propertyGroups" in user_properties:
- props.append("hadoop.proxyuser.{0}.groups".format(user_name))
-
- for prop in props:
- validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
-
- return validationItems
-
def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts):
putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services)
@@ -536,7 +339,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
maxFreeVolumeSizeForHost = 0l #kBytes
for dataDir in dataDirs:
- mp = getMountPointForDir(dataDir, mountPoints)
+ mp = self.getMountPointForDir(dataDir, mountPoints)
for i in range(len(mountPoints)):
if mp == mountPoints[i]:
if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost:
@@ -974,43 +777,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
pass
- def getHostNamesWithComponent(self, serviceName, componentName, services):
- """
- Returns the list of hostnames on which service component is installed
- """
- if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
- service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
- components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
- if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
- componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
- return componentHostnames
- return []
-
- def getHostsWithComponent(self, serviceName, componentName, services, hosts):
- if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
- service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
- components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
- if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
- componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
- componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames]
- return componentHosts
- return []
-
- def getHostWithComponent(self, serviceName, componentName, services, hosts):
- componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts)
- if (len(componentHosts) > 0):
- return componentHosts[0]
- return None
-
- def getHostComponentsByCategories(self, hostname, categories, services, hosts):
- components = []
- if services is not None and hosts is not None:
- for service in services["services"]:
- components.extend([componentEntry for componentEntry in service["components"]
- if componentEntry["StackServiceComponents"]["component_category"] in categories
- and hostname in componentEntry["StackServiceComponents"]["hostnames"]])
- return components
-
# TODO, move this to a generic stack advisor.
def getZKHostPortString(self, services, include_port=True):
"""
@@ -1312,8 +1078,8 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
mountPoints = []
for mountPoint in host["Hosts"]["disk_info"]:
mountPoints.append(mountPoint["mountpoint"])
- hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints)
- hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints)
+ hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints)
+ hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints)
preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
# hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
# if multiple preferred_mountpoints exist
@@ -1331,7 +1097,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
if dn_hosts and collectorHostName in dn_hosts and ams_site and \
dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
for dfs_datadir in dfs_datadirs:
- dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints)
+ dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints)
if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
"{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
@@ -1383,10 +1149,10 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
if logDirItem:
validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}])
- hbase_master_heapsize = to_number(properties["hbase_master_heapsize"])
- hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"])
- hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"])
- hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"])
+ hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"])
+ hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"])
+ hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"])
+ hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"])
# Validate Xmn settings.
masterXmnItem = None
@@ -1476,14 +1242,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize"
xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size"
- hbase_needs_increase = to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb
+ hbase_needs_increase = self.to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb
if unusedMemory > 4*gb and hbase_needs_increase: # warn user, if more than 4GB RAM is unused
- recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb
+ recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + self.to_number(properties.get(heapPropertyToIncrease))*mb
recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB
recommended_hbase_heapsize = round_to_n(recommended_hbase_heapsize/mb,128) # Round to 128m multiple
- if to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize:
+ if self.to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize:
hbaseHeapsizeItem = self.getWarnItem("Consider allocating {0} MB to {1} in ams-hbase-env to use up some "
"unused memory on host"
.format(recommended_hbase_heapsize,
@@ -1491,7 +1257,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
validationItems.extend([{"config-name": heapPropertyToIncrease, "item": hbaseHeapsizeItem}])
recommended_xmn_size = round_to_n(0.15*recommended_hbase_heapsize,128)
- if to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size:
+ if self.to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size:
xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory "
"on host".format(recommended_xmn_size))
validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}])
@@ -1505,7 +1271,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
mb = 1024 * 1024
gb = 1024 * mb
validationItems = []
- collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize"))
+ collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize"))
amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
for collectorHostName in amsCollectorHosts:
for host in hosts["items"]:
@@ -1550,13 +1316,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
if len(heapsize) > 1 and heapsize[-1] in '0123456789':
heapsize = str(heapsize) + "m"
- totalMemoryRequired += formatXmxSizeToBytes(heapsize)
+ totalMemoryRequired += self.formatXmxSizeToBytes(heapsize)
else:
if component == "METRICS_MONITOR" or "CLIENT" in component:
heapsize = '512m'
else:
heapsize = '1024m'
- totalMemoryRequired += formatXmxSizeToBytes(heapsize)
+ totalMemoryRequired += self.formatXmxSizeToBytes(heapsize)
return totalMemoryRequired
def getPreferredMountPoints(self, hostInfo):
@@ -1573,130 +1339,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
mountpoint["type"] in undesirableFsTypes or
mountpoint["available"] == str(0)):
- mountPointsDict[mountpoint["mountpoint"]] = to_number(mountpoint["available"])
+ mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
if mountPointsDict:
mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
mountPoints.append("/")
return mountPoints
- def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo):
- if not propertyName in properties:
- return self.getErrorItem("Value should be set")
- dir = properties[propertyName]
- if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName):
- return None
-
- dir = re.sub("^file://", "", dir, count=1)
- mountPoints = []
- for mountPoint in hostInfo["disk_info"]:
- mountPoints.append(mountPoint["mountpoint"])
- mountPoint = getMountPointForDir(dir, mountPoints)
-
- if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint:
- return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName))
-
- return None
-
- def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace):
- if not propertyName in properties:
- return self.getErrorItem("Value should be set")
- dir = properties[propertyName]
- if not dir.startswith("file://"):
- return None
-
- dir = re.sub("^file://", "", dir, count=1)
- mountPoints = {}
- for mountPoint in hostInfo["disk_info"]:
- mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"])
- mountPoint = getMountPointForDir(dir, mountPoints.keys())
-
- if not mountPoints:
- return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"])
-
- if mountPoints[mountPoint] < reqiuredDiskSpace:
- msg = "Ambari Metrics disk space requirements not met. \n" \
- "Recommended disk space for partition {0} is {1}G"
- return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb
- return None
-
- def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
- if propertyName not in recommendedDefaults:
- # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
- # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
- return None
-
- if not propertyName in properties:
- return self.getErrorItem("Value should be set")
- value = to_number(properties[propertyName])
- if value is None:
- return self.getErrorItem("Value should be integer")
- defaultValue = to_number(recommendedDefaults[propertyName])
- if defaultValue is None:
- return None
- if value < defaultValue:
- return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue))
- return None
-
- def validatorEqualsPropertyItem(self, properties1, propertyName1,
- properties2, propertyName2,
- emptyAllowed=False):
- if not propertyName1 in properties1:
- return self.getErrorItem("Value should be set for %s" % propertyName1)
- if not propertyName2 in properties2:
- return self.getErrorItem("Value should be set for %s" % propertyName2)
- value1 = properties1.get(propertyName1)
- if value1 is None and not emptyAllowed:
- return self.getErrorItem("Empty value for %s" % propertyName1)
- value2 = properties2.get(propertyName2)
- if value2 is None and not emptyAllowed:
- return self.getErrorItem("Empty value for %s" % propertyName2)
- if value1 != value2:
- return self.getWarnItem("It is recommended to set equal values "
- "for properties {0} and {1}".format(propertyName1, propertyName2))
-
- return None
-
- def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
- propertyName):
- if not propertyName in properties:
- return self.getErrorItem("Value should be set for %s" % propertyName)
- value = properties.get(propertyName)
- if not propertyName in recommendedDefaults:
- return self.getErrorItem("Value should be recommended for %s" % propertyName)
- recommendedValue = recommendedDefaults.get(propertyName)
- if value != recommendedValue:
- return self.getWarnItem("It is recommended to set value {0} "
- "for property {1}".format(recommendedValue, propertyName))
- return None
-
- def validatorNotEmpty(self, properties, propertyName):
- if not propertyName in properties:
- return self.getErrorItem("Value should be set for {0}".format(propertyName))
- value = properties.get(propertyName)
- if not value:
- return self.getWarnItem("Empty value for {0}".format(propertyName))
- return None
-
- def validateMinMemorySetting(self, properties, defaultValue, propertyName):
- if not propertyName in properties:
- return self.getErrorItem("Value should be set")
- if defaultValue is None:
- return self.getErrorItem("Config's default value can't be null or undefined")
-
- value = properties[propertyName]
- if value is None:
- return self.getErrorItem("Value can't be null or undefined")
- try:
- valueInt = to_number(value)
- # TODO: generify for other use cases
- defaultValueInt = int(str(defaultValue).strip())
- if valueInt < defaultValueInt:
- return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue))
- except:
- return None
-
- return None
-
+ # TODO, move to YARN Service Advisor
def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services):
if propertyName not in properties:
return self.getErrorItem("Value should be set")
@@ -1712,6 +1361,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return None
+ # TODO, move to YARN Service Advisor
def recommendYarnQueue(self, services, catalog_name=None, queue_property=None):
old_queue_name = None
@@ -1737,15 +1387,15 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
defaultValue = recommendedDefaults[propertyName]
if defaultValue is None:
return self.getErrorItem("Config's default value can't be null or undefined")
- if not checkXmxValueFormat(value) and checkXmxValueFormat(defaultValue):
+ if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue):
# Xmx is in the default-value but not the value, should be an error
return self.getErrorItem('Invalid value format')
- if not checkXmxValueFormat(defaultValue):
+ if not self.checkXmxValueFormat(defaultValue):
# if default value does not contain Xmx, then there is no point in validating existing value
return None
- valueInt = formatXmxSizeToBytes(getXmxSize(value))
- defaultValueXmx = getXmxSize(defaultValue)
- defaultValueInt = formatXmxSizeToBytes(defaultValueXmx)
+ valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value))
+ defaultValueXmx = self.getXmxSize(defaultValue)
+ defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx)
if valueInt < defaultValueInt:
return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx)
return None
@@ -1837,24 +1487,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return dataNodeHosts
return []
- # TODO, move to Service Advisors.
- def getComponentLayoutSchemes(self):
- return {
- 'NAMENODE': {"else": 0},
- 'SECONDARY_NAMENODE': {"else": 1},
- 'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
- 'HISTORYSERVER': {31: 1, "else": 2},
- 'RESOURCEMANAGER': {31: 1, "else": 2},
-
- 'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
- 'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
- 'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
- 'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
- 'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5},
- }
-
def get_system_min_uid(self):
login_defs = '/etc/login.defs'
uid_min_tag = 'UID_MIN'
@@ -2038,17 +1670,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"security_enabled" in services["configurations"]["cluster-env"]["properties"] and\
services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true"
- def get_services_list(self, services):
- """
- Returns available services as list
-
- :type services dict
- :rtype list
- """
- if not services:
- return []
- return [service["StackServices"]["service_name"] for service in services["services"]]
def get_components_list(self, service, services):
"""
@@ -2103,88 +1725,7 @@ def getServicesSiteProperties(services, siteName):
return None
return siteConfig.get("properties")
-def to_number(s):
- try:
- return int(re.sub("\D", "", s))
- except ValueError:
- return None
-def checkXmxValueFormat(value):
- p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
- matches = p.findall(value)
- return len(matches) == 1
-
-def getXmxSize(value):
- p = re.compile("-Xmx(\d+)(.?)")
- result = p.findall(value)[0]
- if len(result) > 1:
- # result[1] - is a space or size formatter (b|k|m|g etc)
- return result[0] + result[1].lower()
- return result[0]
-
-def formatXmxSizeToBytes(value):
- value = value.lower()
- if len(value) == 0:
- return 0
- modifier = value[-1]
-
- if modifier == ' ' or modifier in "0123456789":
- modifier = 'b'
- m = {
- modifier == 'b': 1,
- modifier == 'k': 1024,
- modifier == 'm': 1024 * 1024,
- modifier == 'g': 1024 * 1024 * 1024,
- modifier == 't': 1024 * 1024 * 1024 * 1024,
- modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
- }[1]
- return to_number(value) * m
-
-def getPort(address):
- """
- Extracts port from the address like 0.0.0.0:1019
- """
- if address is None:
- return None
- m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
- if m is not None:
- return int(m.group(2))
- else:
- return None
-
-def isSecurePort(port):
- """
- Returns True if port is root-owned at *nix systems
- """
- if port is not None:
- return port < 1024
- else:
- return False
-
-def getMountPointForDir(dir, mountPoints):
- """
- :param dir: Directory to check, even if it doesn't exist.
- :return: Returns the closest mount point as a string for the directory.
- if the "dir" variable is None, will return None.
- If the directory does not exist, will return "/".
- """
- bestMountFound = None
- if dir:
- dir = re.sub("^file://", "", dir, count=1).strip().lower()
-
- # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be
- # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
- # So take the one with the greatest number of segments.
- for mountPoint in mountPoints:
- # Ensure that the mount path and the dir path ends with "/"
- # The mount point "/hadoop" should not match with the path "/hadoop1"
- if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
- if bestMountFound is None:
- bestMountFound = mountPoint
- elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep):
- bestMountFound = mountPoint
-
- return bestMountFound
def round_to_n(mem_size, n=128):
return int(round(mem_size / float(n))) * int(n)
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
index b275b00..4822732 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
@@ -32,6 +32,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
self.modifyHeapSizeProperties()
self.modifyNotValuableComponents()
self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
def modifyMastersWithMultipleInstances(self):
"""
@@ -74,6 +75,20 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
"""
self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS', 'GANGLIA_SERVER', 'METRICS_COLLECTOR'])
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+ 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+ })
+
def getServiceConfigurationRecommenderDict(self):
parentRecommendConfDict = super(HDP21StackAdvisor, self).getServiceConfigurationRecommenderDict()
childRecommendConfDict = {
@@ -210,16 +225,6 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
if recommended_tez_queue is not None:
putTezProperty("tez.queue.name", recommended_tez_queue)
- # TODO, move to Service Advisors.
- def getComponentLayoutSchemes(self):
- parentSchemes = super(HDP21StackAdvisor, self).getComponentLayoutSchemes()
- childSchemes = {
- 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
- 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
- }
- parentSchemes.update(childSchemes)
- return parentSchemes
-
def getServiceConfigurationValidators(self):
parentValidators = super(HDP21StackAdvisor, self).getServiceConfigurationValidators()
childValidators = {
@@ -233,10 +238,10 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
validationItems = [ {"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')},
{"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')},
{"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ]
- yarnSiteProperties = getSiteProperties(configurations, "yarn-site")
+ yarnSiteProperties = self.getSiteProperties(configurations, "yarn-site")
if yarnSiteProperties:
- yarnSchedulerMaximumAllocationMb = to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
- hiveTezContainerSize = to_number(properties['hive.tez.container.size'])
+ yarnSchedulerMaximumAllocationMb = self.to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
+ hiveTezContainerSize = self.to_number(properties['hive.tez.container.size'])
if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb:
validationItems.append({"config-name": 'hive.tez.container.size', "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")})
return self.toConfigurationValidationProblems(validationItems, "hive-site")
http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index 8980398..f108622 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -30,6 +30,7 @@ import xml.etree.ElementTree as ET
# Local Imports
from resource_management.core.logger import Logger
+
class HDP22StackAdvisor(HDP21StackAdvisor):
def __init__(self):
@@ -1202,23 +1203,23 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
{"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ]
if 'mapreduce.map.java.opts' in properties and \
- checkXmxValueFormat(properties['mapreduce.map.java.opts']):
- mapreduceMapJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024)
- mapreduceMapMemoryMb = to_number(properties['mapreduce.map.memory.mb'])
+ self.checkXmxValueFormat(properties['mapreduce.map.java.opts']):
+ mapreduceMapJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024)
+ mapreduceMapMemoryMb = self.to_number(properties['mapreduce.map.memory.mb'])
if mapreduceMapJavaOpts > mapreduceMapMemoryMb:
validationItems.append({"config-name": 'mapreduce.map.java.opts', "item": self.getWarnItem("mapreduce.map.java.opts Xmx should be less than mapreduce.map.memory.mb ({0})".format(mapreduceMapMemoryMb))})
if 'mapreduce.reduce.java.opts' in properties and \
- checkXmxValueFormat(properties['mapreduce.reduce.java.opts']):
- mapreduceReduceJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024)
- mapreduceReduceMemoryMb = to_number(properties['mapreduce.reduce.memory.mb'])
+ self.checkXmxValueFormat(properties['mapreduce.reduce.java.opts']):
+ mapreduceReduceJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024)
+ mapreduceReduceMemoryMb = self.to_number(properties['mapreduce.reduce.memory.mb'])
if mapreduceReduceJavaOpts > mapreduceReduceMemoryMb:
validationItems.append({"config-name": 'mapreduce.reduce.java.opts', "item": self.getWarnItem("mapreduce.reduce.java.opts Xmx should be less than mapreduce.reduce.memory.mb ({0})".format(mapreduceReduceMemoryMb))})
if 'yarn.app.mapreduce.am.command-opts' in properties and \
- checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']):
- yarnAppMapreduceAmCommandOpts = formatXmxSizeToBytes(getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024)
- yarnAppMapreduceAmResourceMb = to_number(properties['yarn.app.mapreduce.am.resource.mb'])
+ self.checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']):
+ yarnAppMapreduceAmCommandOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024)
+ yarnAppMapreduceAmResourceMb = self.to_number(properties['yarn.app.mapreduce.am.resource.mb'])
if yarnAppMapreduceAmCommandOpts > yarnAppMapreduceAmResourceMb:
validationItems.append({"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.getWarnItem("yarn.app.mapreduce.am.command-opts Xmx should be less than yarn.app.mapreduce.am.resource.mb ({0})".format(yarnAppMapreduceAmResourceMb))})
@@ -1283,7 +1284,7 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
for address_property in address_properties:
if address_property in hdfs_site:
value = hdfs_site[address_property]
- if not is_valid_host_port_authority(value):
+ if not self.is_valid_host_port_authority(value):
validationItems.append({"config-name" : address_property, "item" :
self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)})
@@ -1312,15 +1313,15 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
data_transfer_protection = 'dfs.data.transfer.protection'
try: # Params may be absent
- privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address]))
+ privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address]))
except KeyError:
privileged_dfs_dn_port = False
try:
- privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address]))
+ privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address]))
except KeyError:
privileged_dfs_http_port = False
try:
- privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address]))
+ privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address]))
except KeyError:
privileged_dfs_https_port = False
try:
@@ -1770,15 +1771,3 @@ def is_number(s):
pass
return False
-
-def is_valid_host_port_authority(target):
- has_scheme = "://" in target
- if not has_scheme:
- target = "dummyscheme://"+target
- try:
- result = urlparse(target)
- if result.hostname is not None and result.port is not None:
- return True
- except ValueError:
- pass
- return False