You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/01/17 01:48:23 UTC

[1/2] ambari git commit: AMBARI-19097. HDP 3.0 TP - create Service Advisor for HDFS (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/trunk 5bdd6cf7e -> 326cc1b2a


http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
index 4e4ef51..c7d9327 100644
--- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
@@ -37,6 +37,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -79,6 +80,32 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes = {
+      'NAMENODE': {"else": 0},
+      'SECONDARY_NAMENODE': {"else": 1},
+      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+      'HISTORYSERVER': {31: 1, "else": 2},
+      'RESOURCEMANAGER': {31: 1, "else": 2},
+
+      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+    }
+
   def getComponentLayoutValidations(self, services, hosts):
     """Returns array of Validation objects about issues with hostnames components assigned to"""
     items = []
@@ -561,26 +588,6 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
                         {"config-name": 'hbase_master_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hbase_master_heapsize')}]
     return self.toConfigurationValidationProblems(validationItems, "hbase-env")
 
-
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    return {
-      'NAMENODE': {"else": 0},
-      'SECONDARY_NAMENODE': {"else": 1},
-      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
-      'HISTORYSERVER': {31: 1, "else": 2},
-      'RESOURCEMANAGER': {31: 1, "else": 2},
-
-      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
-      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
-      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
-      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
-      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
-      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
-      }
-
   def getHostsWithComponent(self, serviceName, componentName, services, hosts):
     if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
       service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
index 26292d9..b72f046 100644
--- a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
@@ -33,27 +33,6 @@ def getSiteProperties(configurations, siteName):
     return None
   return siteConfig.get("properties")
 
-def getPort(address):
-  """
-  Extracts port from the address like 0.0.0.0:1019
-  """
-  if address is None:
-    return None
-  m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
-  if m is not None:
-    return int(m.group(2))
-  else:
-    return None
-
-def isSecurePort(port):
-  """
-  Returns True if port is root-owned at *nix systems
-  """
-  if port is not None:
-    return port < 1024
-  else:
-    return False
-
 class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
 
   def __init__(self):
@@ -822,15 +801,15 @@ class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
       data_transfer_protection = 'dfs.data.transfer.protection'
 
       try: # Params may be absent
-        privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address]))
+        privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address]))
       except KeyError:
         privileged_dfs_dn_port = False
       try:
-        privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address]))
+        privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address]))
       except KeyError:
         privileged_dfs_http_port = False
       try:
-        privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address]))
+        privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address]))
       except KeyError:
         privileged_dfs_https_port = False
       try:

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py
index cc556c6..6a92934 100644
--- a/ambari-server/src/main/resources/stacks/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/stack_advisor.py
@@ -23,8 +23,11 @@ import os
 import re
 import socket
 import traceback
+from math import ceil, floor
+from urlparse import urlparse
 
 # Local imports
+from resource_management.libraries.functions.data_structure_utils import get_from_dict
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 
@@ -332,6 +335,7 @@ class DefaultStackAdvisor(StackAdvisor):
     self.notValuableComponents = set()
     self.notPreferableOnServerComponents = set()
     self.cardinalitiesDict = {}
+    self.componentLayoutSchemes = {}
     self.loaded_service_advisors = False
 
 
@@ -878,7 +882,99 @@ class DefaultStackAdvisor(StackAdvisor):
     return items
 
   def getConfigurationClusterSummary(self, servicesList, hosts, components, services):
-    return {}
+    """
+    Copied from HDP 2.0.6 so that it could be used by Service Advisors.
+    :return: Dictionary of memory and CPU attributes in the cluster
+    """
+    hBaseInstalled = False
+    if 'HBASE' in servicesList:
+      hBaseInstalled = True
+
+    cluster = {
+      "cpu": 0,
+      "disk": 0,
+      "ram": 0,
+      "hBaseInstalled": hBaseInstalled,
+      "components": components
+    }
+
+    if len(hosts["items"]) > 0:
+      nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
+      # NodeManager host with least memory is generally used in calculations as it will work in larger hosts.
+      if nodeManagerHosts is not None and len(nodeManagerHosts) > 0:
+        nodeManagerHost = nodeManagerHosts[0];
+        for nmHost in nodeManagerHosts:
+          if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]:
+            nodeManagerHost = nmHost
+        host = nodeManagerHost["Hosts"]
+        cluster["referenceNodeManagerHost"] = host
+      else:
+        host = hosts["items"][0]["Hosts"]
+      cluster["referenceHost"] = host
+      cluster["cpu"] = host["cpu_count"]
+      cluster["disk"] = len(host["disk_info"])
+      cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
+
+    ramRecommendations = [
+      {"os":1, "hbase":1},
+      {"os":2, "hbase":1},
+      {"os":2, "hbase":2},
+      {"os":4, "hbase":4},
+      {"os":6, "hbase":8},
+      {"os":8, "hbase":8},
+      {"os":8, "hbase":8},
+      {"os":12, "hbase":16},
+      {"os":24, "hbase":24},
+      {"os":32, "hbase":32},
+      {"os":64, "hbase":32}
+    ]
+    index = {
+      cluster["ram"] <= 4: 0,
+      4 < cluster["ram"] <= 8: 1,
+      8 < cluster["ram"] <= 16: 2,
+      16 < cluster["ram"] <= 24: 3,
+      24 < cluster["ram"] <= 48: 4,
+      48 < cluster["ram"] <= 64: 5,
+      64 < cluster["ram"] <= 72: 6,
+      72 < cluster["ram"] <= 96: 7,
+      96 < cluster["ram"] <= 128: 8,
+      128 < cluster["ram"] <= 256: 9,
+      256 < cluster["ram"]: 10
+    }[1]
+
+
+    cluster["reservedRam"] = ramRecommendations[index]["os"]
+    cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
+
+
+    cluster["minContainerSize"] = {
+      cluster["ram"] <= 4: 256,
+      4 < cluster["ram"] <= 8: 512,
+      8 < cluster["ram"] <= 24: 1024,
+      24 < cluster["ram"]: 2048
+    }[1]
+
+    totalAvailableRam = cluster["ram"] - cluster["reservedRam"]
+    if cluster["hBaseInstalled"]:
+      totalAvailableRam -= cluster["hbaseRam"]
+    cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
+    '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
+    cluster["containers"] = round(max(3,
+                                      min(2 * cluster["cpu"],
+                                          min(ceil(1.8 * cluster["disk"]),
+                                              cluster["totalAvailableRam"] / cluster["minContainerSize"]))))
+
+    '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers'''
+    cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"])
+    '''If greater than 1GB, value will be in multiples of 512.'''
+    if cluster["ramPerContainer"] > 1024:
+      cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512
+
+    cluster["mapMemory"] = int(cluster["ramPerContainer"])
+    cluster["reduceMemory"] = cluster["ramPerContainer"]
+    cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
+
+    return cluster
 
   def validateClusterConfigurations(self, configurations, services, hosts):
     validationItems = []
@@ -886,6 +982,12 @@ class DefaultStackAdvisor(StackAdvisor):
     return self.toConfigurationValidationProblems(validationItems, "")
 
   def toConfigurationValidationProblems(self, validationProblems, siteName):
+    """
+    Encapsulate the validation item's fields of "level" and "message" for the given validation's config-name.
+    :param validationProblems: List of validation problems
+    :param siteName: Config type
+    :return: List of configuration validation problems that include additional fields like the log level.
+    """
     result = []
     for validationProblem in validationProblems:
       validationItem = validationProblem.get("item", None)
@@ -947,7 +1049,31 @@ class DefaultStackAdvisor(StackAdvisor):
     self.validateMinMax(items, recommendedDefaults, configurations)
     return items
 
+  def validateListOfConfigUsingMethod(self, configurations, recommendedDefaults, services, hosts, validators):
+    """
+    Service Advisors can use this method to pass in a list of validators, each of which is a tuple of a
+    a config type (string) and a function (pointer). Each validator is then executed.
+    :param validators: List of tuples like [("hadoop-env", someFunctionPointer), ("hdfs-site", someFunctionPointer)]
+    :return: List of validation errors
+    """
+    items = []
+    for (configType, method) in validators:
+      if configType in recommendedDefaults:
+        siteProperties = self.getSiteProperties(configurations, configType)
+        if siteProperties is not None:
+          siteRecommendations = recommendedDefaults[configType]["properties"]
+          print("SiteName: %s, method: %s" % (configType, method.__name__))
+          print("Site properties: %s" % str(siteProperties))
+          print("Recommendations: %s" % str(siteRecommendations))
+          validationItems = method(siteProperties, siteRecommendations, configurations, services, hosts)
+          items.extend(validationItems)
+    return items
+
   def validateConfigurationsForSite(self, configurations, recommendedDefaults, services, hosts, siteName, method):
+    """
+    Deprecated, please use validateListOfConfigUsingMethod
+    :return: List of validation errors by calling the corresponding method.
+    """
     if siteName in recommendedDefaults:
       siteProperties = self.getSiteProperties(configurations, siteName)
       if siteProperties is not None:
@@ -1189,7 +1315,7 @@ class DefaultStackAdvisor(StackAdvisor):
     The scheme dictionary basically maps the number of hosts to
     host index where component should exist.
     """
-    return {}
+    return self.componentLayoutSchemes
 
   def getWarnItem(self, message):
     """
@@ -1672,3 +1798,524 @@ class DefaultStackAdvisor(StackAdvisor):
 
       if recommendation:
         put_f(name, ",".join(recommendation))
+
+  def getHostNamesWithComponent(self, serviceName, componentName, services):
+    """
+    Returns the list of hostnames on which service component is installed
+    """
+    if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
+      service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
+      components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+      if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+        componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
+        return componentHostnames
+    return []
+
+  def getHostsWithComponent(self, serviceName, componentName, services, hosts):
+    if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
+      service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
+      components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+      if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+        componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
+        componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames]
+        return componentHosts
+    return []
+
+  def getHostWithComponent(self, serviceName, componentName, services, hosts):
+    componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts)
+    if (len(componentHosts) > 0):
+      return componentHosts[0]
+    return None
+
+  def getHostComponentsByCategories(self, hostname, categories, services, hosts):
+    components = []
+    if services is not None and hosts is not None:
+      for service in services["services"]:
+        components.extend([componentEntry for componentEntry in service["components"]
+                           if componentEntry["StackServiceComponents"]["component_category"] in categories
+                           and hostname in componentEntry["StackServiceComponents"]["hostnames"]])
+    return components
+
+  def get_services_list(self, services):
+    """
+    Returns available services as list
+
+    :type services dict
+    :rtype list
+    """
+    if not services:
+      return []
+
+    return [service["StackServices"]["service_name"] for service in services["services"]]
+
+  def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
+    validationItems = []
+    users = self.getHadoopProxyUsers(services, hosts, configurations)
+    for user_name, user_properties in users.iteritems():
+      props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
+      if "propertyGroups" in user_properties:
+        props.append("hadoop.proxyuser.{0}.groups".format(user_name))
+
+      for prop in props:
+        validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
+
+    return validationItems
+
+  def getHadoopProxyUsers(self, services, hosts, configurations):
+    """
+    Gets Hadoop Proxy User recommendations based on the configuration that is provided by
+    getServiceHadoopProxyUsersConfigurationDict.
+
+    See getServiceHadoopProxyUsersConfigurationDict
+    """
+    servicesList = self.get_services_list(services)
+    users = {}
+
+    for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
+      users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations))
+
+    return users
+
+  def getServiceHadoopProxyUsersConfigurationDict(self):
+    """
+    Returns a map that is used by 'getHadoopProxyUsers' to determine service
+    user properties and related components and get proxyuser recommendations.
+    This method can be overridden in stackadvisors for the further stacks to
+    add additional services or change the previous logic.
+
+    Example of the map format:
+    {
+      "serviceName": [
+        ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"})
+        ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
+        ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
+      ],
+      "serviceName2": [
+        ...
+    }
+
+    If the third element of a tuple is map that maps proxy property to it's value.
+    The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional)
+    If the map value is a string, then this string will be used for the proxyuser
+    value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
+    Otherwise map value should be alist or a tuple with component names.
+    All hosts with the provided components will be added
+    to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3')
+
+    The forth element of the tuple is optional and if it's provided,
+    it should be a function that takes two arguments: services and hosts.
+    If it returns False, proxyusers for the tuple will not be added.
+    """
+    ALL_WILDCARD = "*"
+    HOSTS_PROPERTY = "propertyHosts"
+    GROUPS_PROPERTY = "propertyGroups"
+
+    return {
+      "HDFS":   [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
+      "OOZIE":  [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+      "HIVE":   [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
+                 ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+      "YARN":   [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)],
+      "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
+      "SPARK":  [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})]
+    }
+
+  def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations):
+    Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName))
+    servicesList = self.get_services_list(services)
+    resultUsers = {}
+
+    if serviceName in servicesList:
+      usersComponents = {}
+      for values in serviceUserComponents:
+
+        # Filter, if 4th argument is present in the tuple
+        filterFunction = values[3:]
+        if filterFunction and not filterFunction[0](services, hosts):
+          continue
+
+        userNameConfig, userNameProperty, hostSelectorMap = values[:3]
+        user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None)
+        if user:
+          usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap)
+
+      for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems():
+        proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty}
+        for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
+          componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*'
+          if isinstance(hostSelector, (list, tuple)):
+            _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values
+            for component in hostSelector:
+              componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts)
+              if componentHosts is not None:
+                for componentHost in componentHosts:
+                  componentHostName =  componentHost["Hosts"]["host_name"]
+                  componentHostNames.add(componentHostName)
+
+            componentHostNamesString = ",".join(sorted(componentHostNames))
+            Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString))
+
+          if not proxyPropertyName in proxyUsers:
+            proxyUsers[proxyPropertyName] = componentHostNamesString
+
+        if not user in resultUsers:
+          resultUsers[user] = proxyUsers
+
+    return resultUsers
+
+  def recommendHadoopProxyUsers(self, configurations, services, hosts):
+    servicesList = self.get_services_list(services)
+
+    if 'forced-configurations' not in services:
+      services["forced-configurations"] = []
+
+    putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
+    putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
+
+    users = self.getHadoopProxyUsers(services, hosts, configurations)
+
+    # Force dependencies for HIVE
+    if "HIVE" in servicesList:
+      hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None)
+      if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None):
+        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)})
+
+    for user_name, user_properties in users.iteritems():
+
+      # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users
+      self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty)
+      Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"]))
+      if "propertyGroups" in user_properties:
+        self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+
+      # Remove old properties if user was renamed
+      userOldValue = self.getOldValue(services, user_properties["config"], user_properties["propertyName"])
+      if userOldValue is not None and userOldValue != user_name:
+        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true')
+        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
+        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)})
+
+        if "propertyGroups" in user_properties:
+          putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true')
+          services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
+          services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
+
+    self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
+
+  def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
+    if "HDFS" in servicesList:
+      ambari_user = self.getAmbariUser(services)
+      ambariHostName = socket.getfqdn()
+      self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+      self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
+      old_ambari_user = self.getOldAmbariUser(services)
+      if old_ambari_user is not None:
+        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+  def getAmbariUser(self, services):
+    ambari_user = services['ambari-server-properties']['ambari-server.user']
+    if "cluster-env" in services["configurations"] \
+        and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \
+        and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
+        and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
+      ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+      ambari_user = ambari_user.split('@')[0]
+    return ambari_user
+
+  def getOldAmbariUser(self, services):
+    ambari_user = None
+    if "cluster-env" in services["configurations"]:
+      if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
+          and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
+        ambari_user = services['ambari-server-properties']['ambari-server.user']
+      elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]:
+        ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+        ambari_user = ambari_user.split('@')[0]
+    return ambari_user
+
+  def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
+    is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
+    result_value = "*"
+    result_values_set = self.merge_proxyusers_values(current_value, value)
+    if len(result_values_set) > 0:
+      result_value = ",".join(sorted([val for val in result_values_set if val]))
+
+    if is_groups:
+      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+    else:
+      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+
+    put_function(property_name, result_value)
+
+  def get_data_for_proxyuser(self, user_name, services, configurations, groups=False):
+    """
+    Returns values of proxyuser properties for given user. Properties can be
+    hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
+    :param user_name:
+    :param services:
+    :param groups: if true, will return values for group property, not hosts
+    :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was *
+    """
+    if "core-site" in services["configurations"]:
+      coreSite = services["configurations"]["core-site"]['properties']
+    else:
+      coreSite = {}
+    if groups:
+      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+    else:
+      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+    if property_name in coreSite:
+      property_value = coreSite[property_name]
+      if property_value == "*":
+        return True, set()
+      else:
+        result_values = set()
+        if "core-site" in configurations:
+          if property_name in configurations["core-site"]['properties']:
+            result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(","))
+        result_values = result_values.union(property_value.split(","))
+        return False, result_values
+    return False, set()
+
+  def merge_proxyusers_values(self, first, second):
+    result = set()
+    def append(data):
+      if isinstance(data, str) or isinstance(data, unicode):
+        if data != "*":
+          result.update(data.split(","))
+      else:
+        result.update(data)
+    append(first)
+    append(second)
+    return result
+
+  def getOldValue(self, services, configType, propertyName):
+    if services:
+      if 'changed-configurations' in services.keys():
+        changedConfigs = services["changed-configurations"]
+        for changedConfig in changedConfigs:
+          if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig:
+            return changedConfig["old_value"]
+    return None
+
+  @classmethod
+  def isSecurePort(cls, port):
+    """
+    Returns True if port is root-owned at *nix systems
+    """
+    if port is not None:
+      return port < 1024
+    else:
+      return False
+
+  @classmethod
+  def getPort(cls, address):
+    """
+    Extracts port from the address like 0.0.0.0:1019
+    """
+    if address is None:
+      return None
+    m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
+    if m is not None:
+      return int(m.group(2))
+    else:
+      return None
+
+  def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
+    if propertyName not in recommendedDefaults:
+      # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
+      # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
+      return None
+
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    value = self.to_number(properties[propertyName])
+    if value is None:
+      return self.getErrorItem("Value should be integer")
+    defaultValue = self.to_number(recommendedDefaults[propertyName])
+    if defaultValue is None:
+      return None
+    if value < defaultValue:
+      return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue))
+    return None
+
+  def validatorEqualsPropertyItem(self, properties1, propertyName1,
+                                  properties2, propertyName2,
+                                  emptyAllowed=False):
+    if not propertyName1 in properties1:
+      return self.getErrorItem("Value should be set for %s" % propertyName1)
+    if not propertyName2 in properties2:
+      return self.getErrorItem("Value should be set for %s" % propertyName2)
+    value1 = properties1.get(propertyName1)
+    if value1 is None and not emptyAllowed:
+      return self.getErrorItem("Empty value for %s" % propertyName1)
+    value2 = properties2.get(propertyName2)
+    if value2 is None and not emptyAllowed:
+      return self.getErrorItem("Empty value for %s" % propertyName2)
+    if value1 != value2:
+      return self.getWarnItem("It is recommended to set equal values "
+                              "for properties {0} and {1}".format(propertyName1, propertyName2))
+
+    return None
+
+  def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
+                                       propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set for %s" % propertyName)
+    value = properties.get(propertyName)
+    if not propertyName in recommendedDefaults:
+      return self.getErrorItem("Value should be recommended for %s" % propertyName)
+    recommendedValue = recommendedDefaults.get(propertyName)
+    if value != recommendedValue:
+      return self.getWarnItem("It is recommended to set value {0} "
+                              "for property {1}".format(recommendedValue, propertyName))
+    return None
+
+  def validatorNotEmpty(self, properties, propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set for {0}".format(propertyName))
+    value = properties.get(propertyName)
+    if not value:
+      return self.getWarnItem("Empty value for {0}".format(propertyName))
+    return None
+
+  def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    dir = properties[propertyName]
+    if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName):
+      return None
+
+    dir = re.sub("^file://", "", dir, count=1)
+    mountPoints = []
+    for mountPoint in hostInfo["disk_info"]:
+      mountPoints.append(mountPoint["mountpoint"])
+    mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints)
+
+    if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint:
+      return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName))
+
+    return None
+
+  def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    dir = properties[propertyName]
+    if not dir.startswith("file://"):
+      return None
+
+    dir = re.sub("^file://", "", dir, count=1)
+    mountPoints = {}
+    for mountPoint in hostInfo["disk_info"]:
+      mountPoints[mountPoint["mountpoint"]] = self.to_number(mountPoint["available"])
+    mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints.keys())
+
+    if not mountPoints:
+      return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"])
+
+    if mountPoints[mountPoint] < reqiuredDiskSpace:
+      msg = "Ambari Metrics disk space requirements not met. \n" \
+            "Recommended disk space for partition {0} is {1}G"
+      return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb
+    return None
+
+  @classmethod
+  def is_valid_host_port_authority(cls, target):
+    has_scheme = "://" in target
+    if not has_scheme:
+      target = "dummyscheme://"+target
+    try:
+      result = urlparse(target)
+      if result.hostname is not None and result.port is not None:
+        return True
+    except ValueError:
+      pass
+    return False
+
+  @classmethod
+  def getMountPointForDir(cls, dir, mountPoints):
+    """
+    :param dir: Directory to check, even if it doesn't exist.
+    :return: Returns the closest mount point as a string for the directory.
+    if the "dir" variable is None, will return None.
+    If the directory does not exist, will return "/".
+    """
+    bestMountFound = None
+    if dir:
+      dir = re.sub("^file://", "", dir, count=1).strip().lower()
+
+      # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be
+      # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
+      # So take the one with the greatest number of segments.
+      for mountPoint in mountPoints:
+        # Ensure that the mount path and the dir path ends with "/"
+        # The mount point "/hadoop" should not match with the path "/hadoop1"
+        if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
+          if bestMountFound is None:
+            bestMountFound = mountPoint
+          elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep):
+            bestMountFound = mountPoint
+
+    return bestMountFound
+
+  def validateMinMemorySetting(self, properties, defaultValue, propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    if defaultValue is None:
+      return self.getErrorItem("Config's default value can't be null or undefined")
+
+    value = properties[propertyName]
+    if value is None:
+      return self.getErrorItem("Value can't be null or undefined")
+    try:
+      valueInt = self.to_number(value)
+      # TODO: generify for other use cases
+      defaultValueInt = int(str(defaultValue).strip())
+      if valueInt < defaultValueInt:
+        return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue))
+    except:
+      return None
+
+    return None
+
+  @classmethod
+  def checkXmxValueFormat(cls, value):
+    p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
+    matches = p.findall(value)
+    return len(matches) == 1
+
+  @classmethod
+  def getXmxSize(cls, value):
+    p = re.compile("-Xmx(\d+)(.?)")
+    result = p.findall(value)[0]
+    if len(result) > 1:
+      # result[1] - is a space or size formatter (b|k|m|g etc)
+      return result[0] + result[1].lower()
+    return result[0]
+
+  @classmethod
+  def formatXmxSizeToBytes(cls, value):
+    value = value.lower()
+    if len(value) == 0:
+      return 0
+    modifier = value[-1]
+
+    if modifier == ' ' or modifier in "0123456789":
+      modifier = 'b'
+    m = {
+      modifier == 'b': 1,
+      modifier == 'k': 1024,
+      modifier == 'm': 1024 * 1024,
+      modifier == 'g': 1024 * 1024 * 1024,
+      modifier == 't': 1024 * 1024 * 1024 * 1024,
+      modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
+    }[1]
+    return cls.to_number(value) * m
+
+  @classmethod
+  def to_number(cls, s):
+    try:
+      return int(re.sub("\D", "", s))
+    except ValueError:
+      return None

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/TestStackAdvisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestStackAdvisor.py b/ambari-server/src/test/python/TestStackAdvisor.py
index 87d2d15..d48def3 100644
--- a/ambari-server/src/test/python/TestStackAdvisor.py
+++ b/ambari-server/src/test/python/TestStackAdvisor.py
@@ -270,8 +270,24 @@ class TestStackAdvisorInitialization(TestCase):
     }
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1"}},
-        {"Hosts": {"host_name": "host2"}}
+        {"Hosts": {"host_name": "host1",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         },
+        {"Hosts": {"host_name": "host2",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         }
       ]
     }
     actualValidateConfigResponse = default_stack_advisor.validateConfigurations(services, hosts)
@@ -343,8 +359,14 @@ class TestStackAdvisorInitialization(TestCase):
     # Test with maintenance_state. One host is in maintenance mode.
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1", "maintenance_state":"OFF"}},
-        {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+        {"Hosts": {"host_name": "host1",
+                   "maintenance_state":"OFF",
+                   "cpu_count": 1}
+         },
+        {"Hosts": {"host_name": "host2",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1}
+         }
       ]
     }
 
@@ -397,8 +419,26 @@ class TestStackAdvisorInitialization(TestCase):
     # Test with maintenance_state. Both hosts are in maintenance mode.
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1", "maintenance_state":"ON"}},
-        {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+        {"Hosts": {"host_name": "host1",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         },
+        {"Hosts": {"host_name": "host2",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         }
       ]
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
index 1145154..a486fb3 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
@@ -3253,11 +3253,11 @@ class TestHDP206StackAdvisor(TestCase):
     self.assertEquals(self.stack_advisor_impl.round_to_n(4097), 4096)
 
   def test_getMountPointForDir(self):
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/"]), "/")
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", ["/var", "/"]), "/var")
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var")
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None)
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("relative/path", ["/var", "/"]), None)
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/"]), "/")
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", ["/var", "/"]), "/var")
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("file:///var/log", ["/var", "/"]), "/var")
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("hdfs:///hdfs_path", ["/var", "/"]), None)
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("relative/path", ["/var", "/"]), None)
 
   def test_parseCardinality(self):
     self.assertEquals(self.stackAdvisor.parseCardinality("ALL", 5), (5, 5))

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
index f9fb1f5..e8bd5d0 100644
--- a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
@@ -168,11 +168,8 @@ class TestHDP21StackAdvisor(TestCase):
       if len(components) > 0:
         groups.append(components)
 
-    def sort_nested_lists(list):
-      result_list = []
-      for sublist in list:
-        result_list.append(sorted(sublist))
-      return sorted(result_list)
+    def sort_nested_lists(l):
+      return sorted(reduce(lambda x,y: x+y, l))
 
     self.assertEquals(sort_nested_lists(expected_layout), sort_nested_lists(groups))
 


[2/2] ambari git commit: AMBARI-19097. HDP 3.0 TP - create Service Advisor for HDFS (alejandro)

Posted by al...@apache.org.
AMBARI-19097. HDP 3.0 TP - create Service Advisor for HDFS (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/326cc1b2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/326cc1b2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/326cc1b2

Branch: refs/heads/trunk
Commit: 326cc1b2a1e05073050a38ea104d6d63ed76f373
Parents: 5bdd6cf
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Tue Jan 10 12:55:50 2017 -0800
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Mon Jan 16 17:47:16 2017 -0800

----------------------------------------------------------------------
 .../HDFS/3.0.0.3.0/service_advisor.py           | 602 +++++++++++++++++
 .../ZOOKEEPER/3.4.9/service_advisor.py          |  49 +-
 .../stacks/BIGTOP/0.8/services/stack_advisor.py |  63 +-
 .../stacks/HDP/2.0.6/services/stack_advisor.py  | 553 ++--------------
 .../stacks/HDP/2.1/services/stack_advisor.py    |  31 +-
 .../stacks/HDP/2.2/services/stack_advisor.py    |  39 +-
 .../stacks/HDPWIN/2.1/services/stack_advisor.py |  47 +-
 .../stacks/HDPWIN/2.2/services/stack_advisor.py |  27 +-
 .../src/main/resources/stacks/stack_advisor.py  | 651 ++++++++++++++++++-
 .../src/test/python/TestStackAdvisor.py         |  52 +-
 .../stacks/2.0.6/common/test_stack_advisor.py   |  10 +-
 .../stacks/2.1/common/test_stack_advisor.py     |   7 +-
 12 files changed, 1486 insertions(+), 645 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
new file mode 100644
index 0000000..eb7f35c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/service_advisor.py
@@ -0,0 +1,602 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+import inspect
+import re
+import socket
+from urlparse import urlparse
+
+# Local imports
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.data_structure_utils import get_from_dict
+from resource_management.libraries.functions.mounted_dirs_helper import get_mounts_with_multiple_data_dirs
+
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+  with open(PARENT_FILE, 'rb') as fp:
+    service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+  traceback.print_exc()
+  print "Failed to load parent"
+
+
+class HDFSServiceAdvisor(service_advisor.ServiceAdvisor):
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(HDFSServiceAdvisor, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    # Always call these methods
+    self.modifyMastersWithMultipleInstances()
+    self.modifyCardinalitiesDict()
+    self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
+
+  def modifyMastersWithMultipleInstances(self):
+    """
+    Modify the set of masters with multiple instances.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyCardinalitiesDict(self):
+    """
+    Modify the dictionary of cardinalities.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyHeapSizeProperties(self):
+    """
+    Modify the dictionary of heap size properties.
+    Must be overriden in child class.
+    """
+    self.heap_size_properties = {"NAMENODE":
+                                   [{"config-name": "hadoop-env",
+                                     "property": "namenode_heapsize",
+                                     "default": "1024m"}],
+                                 "SECONDARY_NAMENODE":
+                                   [{"config-name": "hadoop-env",
+                                     "property": "namenode_heapsize",
+                                     "default": "1024m"}],
+                                 "DATANODE":
+                                   [{"config-name": "hadoop-env",
+                                     "property": "dtnode_heapsize",
+                                     "default": "1024m"}]}
+
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other services.
+    Must be overriden in child class.
+    """
+    self.notValuableComponents |= set(['JOURNALNODE', 'ZKFC'])
+
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'NAMENODE': {"else": 0},
+      'SECONDARY_NAMENODE': {"else": 1}
+    })
+
+  def getServiceComponentLayoutValidations(self, services, hosts):
+    """
+    Get a list of errors.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    # HDFS allows NameNode and Secondary NameNode to be on the same host.
+    return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+  def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+    """
+    Entry point.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    # Due to the existing stack inheritance, make it clear where each calculation came from.
+    recommender = HDFSRecommender()
+    recommender.recommendConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+    recommender.recommendConfigurationsFromHDP23(configurations, clusterData, services, hosts)
+
+  def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+    """
+    Entry point.
+    Validate configurations for the service. Return a list of errors.
+    The code for this function should be the same for each Service Advisor.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    validator = HDFSValidator()
+    # Calls the methods of the validator using arguments,
+    # method(siteProperties, siteRecommendations, configurations, services, hosts)
+    return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+class HDFSRecommender(service_advisor.ServiceAdvisor):
+  """
+  HDFS Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+  """
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(HDFSRecommender, self)
+    self.as_super.__init__(*args, **kwargs)
+
+  def recommendConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
+    """
+    Recommend configurations for this service based on HDP 2.0.6.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+    putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services)
+    putHDFSSitePropertyAttributes = self.putPropertyAttribute(configurations, "hdfs-site")
+
+    totalAvailableRam = clusterData['totalAvailableRam']
+    Logger.info("Class: %s, Method: %s. Total Available Ram: %s" % (self.__class__.__name__, inspect.stack()[0][3], str(totalAvailableRam)))
+    putHDFSProperty('namenode_heapsize', max(int(totalAvailableRam / 2), 1024))
+    putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+    putHDFSProperty('namenode_opt_newsize', max(int(totalAvailableRam / 8), 128))
+    putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
+    putHDFSProperty('namenode_opt_maxnewsize', max(int(totalAvailableRam / 8), 256))
+
+    # Check if NN HA is enabled and recommend removing dfs.namenode.rpc-address
+    hdfsSiteProperties = self.getServicesSiteProperties(services, "hdfs-site")
+    nameServices = None
+    if hdfsSiteProperties and 'dfs.internal.nameservices' in hdfsSiteProperties:
+      nameServices = hdfsSiteProperties['dfs.internal.nameservices']
+    if nameServices is None and hdfsSiteProperties and 'dfs.nameservices' in hdfsSiteProperties:
+      nameServices = hdfsSiteProperties['dfs.nameservices']
+    if nameServices and "dfs.ha.namenodes.%s" % nameServices in hdfsSiteProperties:
+      namenodes = hdfsSiteProperties["dfs.ha.namenodes.%s" % nameServices]
+      if len(namenodes.split(',')) > 1:
+        putHDFSSitePropertyAttributes("dfs.namenode.rpc-address", "delete", "true")
+
+    Logger.info("Class: %s, Method: %s. HDFS nameservices: %s" %
+                (self.__class__.__name__, inspect.stack()[0][3], str(nameServices)))
+
+    hdfs_mount_properties = [
+      ("dfs.datanode.data.dir", "DATANODE", "/hadoop/hdfs/data", "multi"),
+      ("dfs.namenode.name.dir", "DATANODE", "/hadoop/hdfs/namenode", "multi"),
+      ("dfs.namenode.checkpoint.dir", "SECONDARY_NAMENODE", "/hadoop/hdfs/namesecondary", "single")
+    ]
+
+    Logger.info("Class: %s, Method: %s. Updating HDFS mount properties." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+    self.updateMountProperties("hdfs-site", hdfs_mount_properties, configurations, services, hosts)
+
+    dataDirs = []
+    if configurations and "hdfs-site" in configurations and \
+            "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"] and \
+            configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"] is not None:
+      dataDirs = configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"].split(",")
+
+    elif hdfsSiteProperties and "dfs.datanode.data.dir" in hdfsSiteProperties and \
+            hdfsSiteProperties["dfs.datanode.data.dir"] is not None:
+      dataDirs = hdfsSiteProperties["dfs.datanode.data.dir"].split(",")
+
+    Logger.info("Class: %s, Method: %s. HDFS Data Dirs: %s" %
+                (self.__class__.__name__, inspect.stack()[0][3], str(dataDirs)))
+
+    # dfs.datanode.du.reserved should be set to 10-15% of volume size
+    # For each host selects maximum size of the volume. Then gets minimum for all hosts.
+    # This ensures that each host will have at least one data dir with available space.
+    reservedSizeRecommendation = 0l #kBytes
+    for host in hosts["items"]:
+      mountPoints = []
+      mountPointDiskAvailableSpace = [] #kBytes
+      for diskInfo in host["Hosts"]["disk_info"]:
+        mountPoints.append(diskInfo["mountpoint"])
+        mountPointDiskAvailableSpace.append(long(diskInfo["size"]))
+
+      maxFreeVolumeSizeForHost = 0l #kBytes
+      for dataDir in dataDirs:
+        mp = HDFSRecommender.getMountPointForDir(dataDir, mountPoints)
+        for i in range(len(mountPoints)):
+          if mp == mountPoints[i]:
+            if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost:
+              maxFreeVolumeSizeForHost = mountPointDiskAvailableSpace[i]
+
+      if (not reservedSizeRecommendation) or (maxFreeVolumeSizeForHost and maxFreeVolumeSizeForHost < reservedSizeRecommendation):
+        reservedSizeRecommendation = maxFreeVolumeSizeForHost
+
+    Logger.info("Class: %s, Method: %s. HDFS Datanode recommended reserved size: %d" %
+                (self.__class__.__name__, inspect.stack()[0][3], reservedSizeRecommendation))
+
+    if reservedSizeRecommendation:
+      reservedSizeRecommendation = max(reservedSizeRecommendation * 1024 / 8, 1073741824) # At least 1Gb is reserved
+      putHDFSSiteProperty('dfs.datanode.du.reserved', reservedSizeRecommendation) #Bytes
+
+    # recommendations for "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" properties in core-site
+    self.recommendHadoopProxyUsers(configurations, services, hosts)
+
+  def recommendConfigurationsFromHDP23(self, configurations, clusterData, services, hosts):
+    """
+    Recommend configurations for this service based on HDP 2.3.
+    """
+    putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services)
+    putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site")
+
+    if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']):
+      rangerPluginEnabled = ''
+      if 'ranger-hdfs-plugin-properties' in configurations and 'ranger-hdfs-plugin-enabled' in  configurations['ranger-hdfs-plugin-properties']['properties']:
+        rangerPluginEnabled = configurations['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled']
+      elif 'ranger-hdfs-plugin-properties' in services['configurations'] and 'ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']:
+        rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled']
+
+      if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()):
+        putHdfsSiteProperty("dfs.namenode.inode.attributes.provider.class",'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer')
+      else:
+        putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
+    else:
+      putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
+
+
+class HDFSValidator(service_advisor.ServiceAdvisor):
+  """
+  HDFS Validator checks the correctness of properties whenever the service is first added or the user attempts to
+  change configs via the UI.
+  """
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(HDFSValidator, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    self.validators = [("hdfs-site", self.validateHDFSConfigurationsFromHDP206),
+                       ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP206),
+                       ("core-site", self.validateHDFSCoreSiteFromHDP206),
+                       ("hdfs-site", self.validateHDFSConfigurationsFromHDP22),
+                       ("hadoop-env", self.validateHadoopEnvConfigurationsFromHDP22),
+                       ("ranger-hdfs-plugin-properties", self.validateHDFSRangerPluginConfigurationsFromHDP22),
+                       ("hdfs-site", self.validateRangerAuthorizerFromHDP23)]
+
+    # **********************************************************
+    # Example of how to add a function that validates a certain config type.
+    # If the same config type has multiple functions, can keep adding tuples to self.validators
+    #self.validators.append(("hadoop-env", self.sampleValidator))
+
+  def sampleValidator(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    Example of a validator function other other Service Advisors to emulate.
+    :return: A list of configuration validation problems.
+    """
+    validationItems = []
+
+    '''
+    Item is a simple dictionary.
+    Two functions can be used to construct it depending on the log level: WARN|ERROR
+    E.g.,
+    self.getErrorItem(message) or self.getWarnItem(message)
+
+    item = {"level": "ERROR|WARN", "message": "value"}
+    '''
+    validationItems.append({"config-name": "my_config_property_name",
+                            "item": self.getErrorItem("My custom message in method %s" % inspect.stack()[0][3])})
+    return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+  def validateHDFSConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.0.6; validate hdfs-site
+    :return: A list of configuration validation problems.
+    """
+    clusterEnv = self.getSiteProperties(configurations, "cluster-env")
+    validationItems = [{"config-name": 'dfs.datanode.du.reserved', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'dfs.datanode.du.reserved')},
+                       {"config-name": 'dfs.datanode.data.dir', "item": self.validatorOneDataDirPerPartition(properties, 'dfs.datanode.data.dir', services, hosts, clusterEnv)}]
+    return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+  def validatorOneDataDirPerPartition(self, properties, propertyName, services, hosts, clusterEnv):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    dirs = properties[propertyName]
+
+    if not (clusterEnv and "one_dir_per_partition" in clusterEnv and clusterEnv["one_dir_per_partition"].lower() == "true"):
+      return None
+
+    dataNodeHosts = self.getDataNodeHosts(services, hosts)
+
+    warnings = set()
+    for host in dataNodeHosts:
+      hostName = host["Hosts"]["host_name"]
+
+      mountPoints = []
+      for diskInfo in host["Hosts"]["disk_info"]:
+        mountPoints.append(diskInfo["mountpoint"])
+
+      if get_mounts_with_multiple_data_dirs(mountPoints, dirs):
+        # A detailed message can be too long on large clusters:
+        # warnings.append("Host: " + hostName + "; Mount: " + mountPoint + "; Data directories: " + ", ".join(dirList))
+        warnings.add(hostName)
+        break
+
+    if len(warnings) > 0:
+      return self.getWarnItem("cluster-env/one_dir_per_partition is enabled but there are multiple data directories on the same mount. Affected hosts: {0}".format(", ".join(sorted(warnings))))
+
+    return None
+
+  def validateHadoopEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.0.6; validate hadoop-env
+    :return: A list of configuration validation problems.
+    """
+    validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')},
+                        {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')},
+                        {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}]
+    return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+  def validateHDFSCoreSiteFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.0.6; validate core-site
+    :return: A list of configuration validation problems.
+    """
+    validationItems = []
+    validationItems.extend(self.getHadoopProxyUsersValidationItems(properties, services, hosts, configurations))
+    validationItems.extend(self.getAmbariProxyUsersForHDFSValidationItems(properties, services))
+    return self.toConfigurationValidationProblems(validationItems, "core-site")
+
+  def getAmbariProxyUsersForHDFSValidationItems(self, properties, services):
+    validationItems = []
+    servicesList = self.get_services_list(services)
+
+    if "HDFS" in servicesList:
+      ambari_user = self.getAmbariUser(services)
+      props = (
+        "hadoop.proxyuser.{0}.hosts".format(ambari_user),
+        "hadoop.proxyuser.{0}.groups".format(ambari_user)
+      )
+      for prop in props:
+        validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
+
+    return validationItems
+
+  def validateHDFSConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.2; validate hdfs-site
+    :return: A list of configuration validation problems.
+    """
+    # We can not access property hadoop.security.authentication from the
+    # other config (core-site). That's why we are using another heuristic here
+    hdfs_site = properties
+    core_site = self.getSiteProperties(configurations, "core-site")
+
+    dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer'  # Hadoop Wire encryption
+    wire_encryption_enabled = False
+    try:
+      wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true"
+    except KeyError:
+      pass
+
+    HTTP_ONLY = 'HTTP_ONLY'
+    HTTPS_ONLY = 'HTTPS_ONLY'
+    HTTP_AND_HTTPS = 'HTTP_AND_HTTPS'
+
+    VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS]
+    VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy']
+
+    validationItems = []
+    address_properties = [
+      # "dfs.datanode.address",
+      # "dfs.datanode.http.address",
+      # "dfs.datanode.https.address",
+      # "dfs.datanode.ipc.address",
+      # "dfs.journalnode.http-address",
+      # "dfs.journalnode.https-address",
+      # "dfs.namenode.rpc-address",
+      # "dfs.namenode.secondary.http-address",
+      "dfs.namenode.http-address",
+      "dfs.namenode.https-address",
+    ]
+    #Validating *address properties for correct values
+
+    for address_property in address_properties:
+      if address_property in hdfs_site:
+        value = hdfs_site[address_property]
+        if not HDFSValidator.is_valid_host_port_authority(value):
+          validationItems.append({"config-name" : address_property, "item" :
+            self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)})
+
+    #Adding Ranger Plugin logic here
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+    ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
+      if 'dfs.permissions.enabled' in hdfs_site and \
+              hdfs_site['dfs.permissions.enabled'] != 'true':
+        validationItems.append({"config-name": 'dfs.permissions.enabled',
+                                "item": self.getWarnItem("dfs.permissions.enabled needs to be set to true if Ranger HDFS Plugin is enabled.")})
+
+    if (not wire_encryption_enabled and   # If wire encryption is enabled at Hadoop, it disables all our checks
+            'hadoop.security.authentication' in core_site and
+            core_site['hadoop.security.authentication'] == 'kerberos' and
+            'hadoop.security.authorization' in core_site and
+            core_site['hadoop.security.authorization'] == 'true'):
+      # security is enabled
+
+      dfs_http_policy = 'dfs.http.policy'
+      dfs_datanode_address = 'dfs.datanode.address'
+      datanode_http_address = 'dfs.datanode.http.address'
+      datanode_https_address = 'dfs.datanode.https.address'
+      data_transfer_protection = 'dfs.data.transfer.protection'
+
+      try: # Params may be absent
+        privileged_dfs_dn_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[dfs_datanode_address]))
+      except KeyError:
+        privileged_dfs_dn_port = False
+      try:
+        privileged_dfs_http_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_http_address]))
+      except KeyError:
+        privileged_dfs_http_port = False
+      try:
+        privileged_dfs_https_port = HDFSValidator.isSecurePort(HDFSValidator.getPort(hdfs_site[datanode_https_address]))
+      except KeyError:
+        privileged_dfs_https_port = False
+      try:
+        dfs_http_policy_value = hdfs_site[dfs_http_policy]
+      except KeyError:
+        dfs_http_policy_value = HTTP_ONLY  # Default
+      try:
+        data_transfer_protection_value = hdfs_site[data_transfer_protection]
+      except KeyError:
+        data_transfer_protection_value = None
+
+      if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES:
+        validationItems.append({"config-name": dfs_http_policy,
+                                "item": self.getWarnItem(
+                                  "Invalid property value: {0}. Valid values are {1}".format(
+                                    dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))})
+
+      # determine whether we use secure ports
+      address_properties_with_warnings = []
+      if dfs_http_policy_value == HTTPS_ONLY:
+        if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site):
+          important_properties = [dfs_datanode_address, datanode_https_address]
+          message = "You set up datanode to use some non-secure ports. " \
+                    "If you want to run Datanode under non-root user in a secure cluster, " \
+                    "you should set all these properties {2} " \
+                    "to use non-secure ports (if property {3} does not exist, " \
+                    "just add it). You may also set up property {4} ('{5}' is a good default value). " \
+                    "Also, set up WebHDFS with SSL as " \
+                    "described in manual in order to be able to " \
+                    "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties,
+                                        datanode_https_address, data_transfer_protection,
+                                        VALID_TRANSFER_PROTECTION_VALUES[0])
+          address_properties_with_warnings.extend(important_properties)
+      else:  # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY
+        # We don't enforce datanode_https_address to use privileged ports here
+        any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port
+        if any_nonprivileged_ports_are_in_use:
+          important_properties = [dfs_datanode_address, datanode_http_address]
+          message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \
+                    "In a secure cluster, Datanode forbids using non-secure ports " \
+                    "if {0} is not set to {3}. " \
+                    "Please make sure that properties {2} use secure ports.".format(
+            dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY)
+          address_properties_with_warnings.extend(important_properties)
+
+      # Generate port-related warnings if any
+      for prop in address_properties_with_warnings:
+        validationItems.append({"config-name": prop,
+                                "item": self.getWarnItem(message)})
+
+      # Check if it is appropriate to use dfs.data.transfer.protection
+      if data_transfer_protection_value is not None:
+        if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]:
+          validationItems.append({"config-name": data_transfer_protection,
+                                  "item": self.getWarnItem(
+                                    "{0} property can not be used when {1} is set to any "
+                                    "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format(
+                                      data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))})
+        elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES:
+          validationItems.append({"config-name": data_transfer_protection,
+                                  "item": self.getWarnItem(
+                                    "Invalid property value: {0}. Valid values are {1}.".format(
+                                      data_transfer_protection_value, VALID_TRANSFER_PROTECTION_VALUES))})
+    return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+  def validateHadoopEnvConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.2; validate hadoop-env
+    :return: A list of configuration validation problems.
+    """
+    validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')},
+                        {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')},
+                        {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}]
+    return self.toConfigurationValidationProblems(validationItems, "hadoop-env")
+
+  def validateHDFSRangerPluginConfigurationsFromHDP22(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.2; validate ranger-hdfs-plugin-properties
+    :return: A list of configuration validation problems.
+    """
+    validationItems = []
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+    ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+    if ranger_plugin_enabled.lower() == 'yes':
+      # ranger-hdfs-plugin must be enabled in ranger-env
+      ranger_env = self.getServicesSiteProperties(services, 'ranger-env')
+      if not ranger_env or not 'ranger-hdfs-plugin-enabled' in ranger_env or ranger_env['ranger-hdfs-plugin-enabled'].lower() != 'yes':
+        validationItems.append({"config-name": 'ranger-hdfs-plugin-enabled',
+                                "item": self.getWarnItem(
+                                  "ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled must correspond ranger-env/ranger-hdfs-plugin-enabled")})
+    return self.toConfigurationValidationProblems(validationItems, "ranger-hdfs-plugin-properties")
+
+  def validateRangerAuthorizerFromHDP23(self, properties, recommendedDefaults, configurations, services, hosts):
+    """
+    This was copied from HDP 2.3
+    If Ranger service is present and the ranger plugin is enabled, check that the provider class is correctly set.
+    :return: A list of configuration validation problems.
+    """
+    Logger.info("Class: %s, Method: %s. Checking if Ranger service is present and if the provider class is using the Ranger Authorizer." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+    # We can not access property hadoop.security.authentication from the
+    # other config (core-site). That's why we are using another heuristics here
+    hdfs_site = properties
+    validationItems = [] #Adding Ranger Plugin logic here
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+    ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
+    servicesList = self.getServiceNames(services)
+    if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'yes'):
+
+      try:
+        if hdfs_site['dfs.namenode.inode.attributes.provider.class'].lower() != 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer'.lower():
+          raise ValueError()
+      except (KeyError, ValueError), e:
+        message = "dfs.namenode.inode.attributes.provider.class needs to be set to 'org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer' if Ranger HDFS Plugin is enabled."
+        validationItems.append({"config-name": 'dfs.namenode.inode.attributes.provider.class',
+                                "item": self.getWarnItem(message)})
+
+    return self.toConfigurationValidationProblems(validationItems, "hdfs-site")
+
+  def getDataNodeHosts(self, services, hosts):
+    """
+    Returns the list of Data Node hosts. If none, return an empty list.
+    """
+    if len(hosts["items"]) > 0:
+      dataNodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts)
+      if dataNodeHosts is not None:
+        return dataNodeHosts
+    return []
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
index 82316f4..4174b9c 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.9/service_advisor.py
@@ -48,6 +48,9 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
     self.modifyMastersWithMultipleInstances()
     self.modifyCardinalitiesDict()
     self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -72,22 +75,46 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
                                                        "property": "zk_server_heapsize",
                                                        "default": "1024m"}]}
 
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other services.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
   def getServiceComponentLayoutValidations(self, services, hosts):
     """
     Get a list of errors. Zookeeper does not have any validations in this version.
     """
-    service_name = services["services"][0]["StackServices"]["service_name"]
-    Logger.info("Class: %s, Method: %s. Validating Service Component Layout for Service: %s." %
-                (self.__class__.__name__, inspect.stack()[0][3], service_name))
+    Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
     return self.as_super.getServiceComponentLayoutValidations(services, hosts)
 
   def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
     """
     Recommend configurations to set. Zookeeper does not have any recommendations in this version.
     """
-    service_name = services["services"][0]["StackServices"]["service_name"]
-    Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." %
-                (self.__class__.__name__, inspect.stack()[0][3], service_name))
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
 
     self.recommendConfigurations(configurations, clusterData, services, hosts)
 
@@ -95,9 +122,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
     """
     Recommend configurations for this service.
     """
-    service_name = services["services"][0]["StackServices"]["service_name"]
-    Logger.info("Class: %s, Method: %s. Recommending Service Configurations for Service: %s." %
-                (self.__class__.__name__, inspect.stack()[0][3], service_name))
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
 
     Logger.info("Setting zoo.cfg to default dataDir to /hadoop/zookeeper on the best matching mount")
 
@@ -110,9 +136,8 @@ class ZookeeperServiceAdvisor(service_advisor.ServiceAdvisor):
     """
     Validate configurations for the service. Return a list of errors.
     """
-    service_name = services["services"][0]["StackServices"]["service_name"]
-    Logger.info("Class: %s, Method: %s. Validating Configurations for Service: %s." %
-                (self.__class__.__name__, inspect.stack()[0][3], service_name))
+    Logger.info("Class: %s, Method: %s. Validating Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
 
     items = []
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
index 5172042..6ef74d2 100644
--- a/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/BIGTOP/0.8/services/stack_advisor.py
@@ -37,6 +37,7 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -79,6 +80,28 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes = {
+      'NAMENODE': {"else": 0},
+      'SECONDARY_NAMENODE': {"else": 1},
+      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+      'HISTORYSERVER': {31: 1, "else": 2},
+      'RESOURCEMANAGER': {31: 1, "else": 2},
+
+      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+    }
+
   def getComponentLayoutValidations(self, services, hosts):
     """Returns array of Validation objects about issues with hostnames components assigned to"""
     items = []
@@ -330,23 +353,6 @@ class BaseBIGTOP08StackAdvisor(DefaultStackAdvisor):
                         {"config-name": 'yarn.scheduler.maximum-allocation-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.scheduler.maximum-allocation-mb')} ]
     return self.toConfigurationValidationProblems(validationItems, "yarn-site")
 
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    return {
-      'NAMENODE': {"else": 0},
-      'SECONDARY_NAMENODE': {"else": 1},
-      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
-      'HISTORYSERVER': {31: 1, "else": 2},
-      'RESOURCEMANAGER': {31: 1, "else": 2},
-
-      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
-      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
-      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
-      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
-      }
-
 class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
 
   def __init__(self):
@@ -357,6 +363,7 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -396,6 +403,18 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+    })
+
   def getServiceConfigurationRecommenderDict(self):
     parentRecommendConfDict = super(BIGTOP08StackAdvisor, self).getServiceConfigurationRecommenderDict()
     childRecommendConfDict = {
@@ -430,16 +449,6 @@ class BIGTOP08StackAdvisor(BaseBIGTOP08StackAdvisor):
                    "-server -Xmx" + str(int(0.8 * clusterData["amMemory"]))
                    + "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC")
 
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    parentSchemes = super(BIGTOP08StackAdvisor, self).getComponentLayoutSchemes()
-    childSchemes = {
-        'APP_TIMELINE_SERVER': {31: 1, "else": 2},
-        'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
-    }
-    parentSchemes.update(childSchemes)
-    return parentSchemes
-
   def getServiceConfigurationValidators(self):
     parentValidators = super(BIGTOP08StackAdvisor, self).getServiceConfigurationValidators()
     childValidators = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 9816702..3596798 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -42,6 +42,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -84,6 +85,29 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['GANGLIA_SERVER', 'METRICS_COLLECTOR'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'NAMENODE': {"else": 0},
+      'SECONDARY_NAMENODE': {"else": 1},
+      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+      'HISTORYSERVER': {31: 1, "else": 2},
+      'RESOURCEMANAGER': {31: 1, "else": 2},
+
+      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+      'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5},
+    })
+
   def getComponentLayoutValidations(self, services, hosts):
     """Returns array of Validation objects about issues with hostnames components assigned to"""
     items = super(HDP206StackAdvisor, self).getComponentLayoutValidations(services, hosts)
@@ -248,17 +272,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
          ambari_user = ambari_user.split('@')[0]
     return ambari_user
 
-  def recommendAmbariProxyUsersForHDFS(self, services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
-    if "HDFS" in servicesList:
-      ambari_user = self.getAmbariUser(services)
-      ambariHostName = socket.getfqdn()
-      self.put_proxyuser_value(ambari_user, ambariHostName, services=services, configurations=configurations, put_function=putCoreSiteProperty)
-      self.put_proxyuser_value(ambari_user, "*", is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
-      old_ambari_user = self.getOldAmbariUser(services)
-      if old_ambari_user is not None:
-        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
-        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
-
   def getAmbariProxyUsersForHDFSValidationItems(self, properties, services):
     validationItems = []
     servicesList = self.get_services_list(services)
@@ -274,216 +287,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
     return validationItems
 
-  def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, services, hosts, configurations):
-    Logger.info("Calculating Hadoop Proxy User recommendations for {0} service.".format(serviceName))
-    servicesList = self.get_services_list(services)
-    resultUsers = {}
-
-    if serviceName in servicesList:
-      usersComponents = {}
-      for values in serviceUserComponents:
-
-        # Filter, if 4th argument is present in the tuple
-        filterFunction = values[3:]
-        if filterFunction and not filterFunction[0](services, hosts):
-          continue
-
-        userNameConfig, userNameProperty, hostSelectorMap = values[:3]
-        user = get_from_dict(services, ("configurations", userNameConfig, "properties", userNameProperty), None)
-        if user:
-          usersComponents[user] = (userNameConfig, userNameProperty, hostSelectorMap)
-
-      for user, (userNameConfig, userNameProperty, hostSelectorMap) in usersComponents.iteritems():
-        proxyUsers = {"config": userNameConfig, "propertyName": userNameProperty}
-        for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
-          componentHostNamesString = hostSelector if isinstance(hostSelector, basestring) else '*'
-          if isinstance(hostSelector, (list, tuple)):
-            _, componentHostNames = self.get_data_for_proxyuser(user, services, configurations) # preserve old values
-            for component in hostSelector:
-              componentHosts = self.getHostsWithComponent(serviceName, component, services, hosts)
-              if componentHosts is not None:
-                for componentHost in componentHosts:
-                  componentHostName =  componentHost["Hosts"]["host_name"]
-                  componentHostNames.add(componentHostName)
-
-            componentHostNamesString = ",".join(sorted(componentHostNames))
-            Logger.info("Host List for [service='{0}'; user='{1}'; components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), componentHostNamesString))
-
-          if not proxyPropertyName in proxyUsers:
-            proxyUsers[proxyPropertyName] = componentHostNamesString
-
-        if not user in resultUsers:
-          resultUsers[user] = proxyUsers
-
-    return resultUsers
-
-  def getHadoopProxyUsers(self, services, hosts, configurations):
-    """
-    Gets Hadoop Proxy User recommendations based on the configuration that is provided by
-    getServiceHadoopProxyUsersConfigurationDict.
-
-    See getServiceHadoopProxyUsersConfigurationDict
-    """
-    servicesList = self.get_services_list(services)
-    users = {}
-
-    for serviceName, serviceUserComponents in self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
-      users.update(self._getHadoopProxyUsersForService(serviceName, serviceUserComponents, services, hosts, configurations))
-
-    return users
-
-  def get_data_for_proxyuser(self, user_name, services, configurations, groups=False):
-    """
-    Returns values of proxyuser properties for given user. Properties can be
-    hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
-    :param user_name:
-    :param services:
-    :param groups: if true, will return values for group property, not hosts
-    :return: tuple (wildcard_value, set[values]), where wildcard_value indicates if property value was *
-    """
-    if "core-site" in services["configurations"]:
-      coreSite = services["configurations"]["core-site"]['properties']
-    else:
-      coreSite = {}
-    if groups:
-      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
-    else:
-      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
-    if property_name in coreSite:
-      property_value = coreSite[property_name]
-      if property_value == "*":
-        return True, set()
-      else:
-        result_values = set()
-        if "core-site" in configurations:
-          if property_name in configurations["core-site"]['properties']:
-            result_values = result_values.union(configurations["core-site"]['properties'][property_name].split(","))
-        result_values = result_values.union(property_value.split(","))
-        return False, result_values
-    return False, set()
-
-  def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
-    is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
-    result_value = "*"
-    result_values_set = self.merge_proxyusers_values(current_value, value)
-    if len(result_values_set) > 0:
-      result_value = ",".join(sorted([val for val in result_values_set if val]))
-
-    if is_groups:
-      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
-    else:
-      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
-
-    put_function(property_name, result_value)
-
-  def merge_proxyusers_values(self, first, second):
-    result = set()
-    def append(data):
-      if isinstance(data, str) or isinstance(data, unicode):
-        if data != "*":
-          result.update(data.split(","))
-      else:
-        result.update(data)
-    append(first)
-    append(second)
-    return result
-
-  def getServiceHadoopProxyUsersConfigurationDict(self):
-    """
-    Returns a map that is used by 'getHadoopProxyUsers' to determine service
-    user properties and related components and get proxyuser recommendations.
-    This method can be overridden in stackadvisors for the further stacks to
-    add additional services or change the previous logic.
-
-    Example of the map format:
-    {
-      "serviceName": [
-        ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", "propertyGroups": "exact string value"})
-        ("configTypeName2", "userPropertyName2", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
-        ("configTypeName3", "userPropertyName3", {"propertyHosts": ["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
-      ],
-      "serviceName2": [
-        ...
-    }
-
-    If the third element of a tuple is map that maps proxy property to it's value.
-    The key could be either 'propertyHosts' or 'propertyGroups'. (Both are optional)
-    If the map value is a string, then this string will be used for the proxyuser
-    value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
-    Otherwise map value should be alist or a tuple with component names.
-    All hosts with the provided components will be added
-    to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 'host1,host2,host3')
-
-    The forth element of the tuple is optional and if it's provided,
-    it should be a function that takes two arguments: services and hosts.
-    If it returns False, proxyusers for the tuple will not be added.
-    """
-    ALL_WILDCARD = "*"
-    HOSTS_PROPERTY = "propertyHosts"
-    GROUPS_PROPERTY = "propertyGroups"
-
-    return {
-      "HDFS":   [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
-      "OOZIE":  [("oozie-env", "oozie_user", {HOSTS_PROPERTY: ["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
-      "HIVE":   [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", "HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
-                 ("hive-env", "webhcat_user", {HOSTS_PROPERTY: ["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
-      "YARN":   [("yarn-env", "yarn_user", {HOSTS_PROPERTY: ["RESOURCEMANAGER"]}, lambda services, hosts: len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 1)],
-      "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})],
-      "SPARK":  [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, GROUPS_PROPERTY: ALL_WILDCARD})]
-    }
-
-  def recommendHadoopProxyUsers(self, configurations, services, hosts):
-    servicesList = self.get_services_list(services)
-
-    if 'forced-configurations' not in services:
-      services["forced-configurations"] = []
-
-    putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
-    putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
-
-    users = self.getHadoopProxyUsers(services, hosts, configurations)
-
-    # Force dependencies for HIVE
-    if "HIVE" in servicesList:
-      hive_user = get_from_dict(services, ("configurations", "hive-env", "properties", "hive_user"), default_value=None)
-      if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), default_value=None):
-        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(hive_user)})
-
-    for user_name, user_properties in users.iteritems():
-
-      # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" to core-site for all users
-      self.put_proxyuser_value(user_name, user_properties["propertyHosts"], services=services, configurations=configurations, put_function=putCoreSiteProperty)
-      Logger.info("Updated hadoop.proxyuser.{0}.hosts as : {1}".format(user_name, user_properties["propertyHosts"]))
-      if "propertyGroups" in user_properties:
-        self.put_proxyuser_value(user_name, user_properties["propertyGroups"], is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
-
-      # Remove old properties if user was renamed
-      userOldValue = getOldValue(self, services, user_properties["config"], user_properties["propertyName"])
-      if userOldValue is not None and userOldValue != user_name:
-        putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 'delete', 'true')
-        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
-        services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.hosts".format(user_name)})
-
-        if "propertyGroups" in user_properties:
-          putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue), 'delete', 'true')
-          services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
-          services["forced-configurations"].append({"type" : "core-site", "name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
-
-    self.recommendAmbariProxyUsersForHDFS(services, configurations, servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
-
-  def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
-    validationItems = []
-    users = self.getHadoopProxyUsers(services, hosts, configurations)
-    for user_name, user_properties in users.iteritems():
-      props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
-      if "propertyGroups" in user_properties:
-        props.append("hadoop.proxyuser.{0}.groups".format(user_name))
-
-      for prop in props:
-        validationItems.append({"config-name": prop, "item": self.validatorNotEmpty(properties, prop)})
-
-    return validationItems
-
   def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts):
     putHDFSProperty = self.putProperty(configurations, "hadoop-env", services)
     putHDFSSiteProperty = self.putProperty(configurations, "hdfs-site", services)
@@ -536,7 +339,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
       maxFreeVolumeSizeForHost = 0l #kBytes
       for dataDir in dataDirs:
-        mp = getMountPointForDir(dataDir, mountPoints)
+        mp = self.getMountPointForDir(dataDir, mountPoints)
         for i in range(len(mountPoints)):
           if mp == mountPoints[i]:
             if mountPointDiskAvailableSpace[i] > maxFreeVolumeSizeForHost:
@@ -974,43 +777,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
     pass
 
-  def getHostNamesWithComponent(self, serviceName, componentName, services):
-    """
-    Returns the list of hostnames on which service component is installed
-    """
-    if services is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
-      service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
-      components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
-      if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
-        componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
-        return componentHostnames
-    return []
-
-  def getHostsWithComponent(self, serviceName, componentName, services, hosts):
-    if services is not None and hosts is not None and serviceName in [service["StackServices"]["service_name"] for service in services["services"]]:
-      service = [serviceEntry for serviceEntry in services["services"] if serviceEntry["StackServices"]["service_name"] == serviceName][0]
-      components = [componentEntry for componentEntry in service["components"] if componentEntry["StackServiceComponents"]["component_name"] == componentName]
-      if (len(components) > 0 and len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
-        componentHostnames = components[0]["StackServiceComponents"]["hostnames"]
-        componentHosts = [host for host in hosts["items"] if host["Hosts"]["host_name"] in componentHostnames]
-        return componentHosts
-    return []
-
-  def getHostWithComponent(self, serviceName, componentName, services, hosts):
-    componentHosts = self.getHostsWithComponent(serviceName, componentName, services, hosts)
-    if (len(componentHosts) > 0):
-      return componentHosts[0]
-    return None
-
-  def getHostComponentsByCategories(self, hostname, categories, services, hosts):
-    components = []
-    if services is not None and hosts is not None:
-      for service in services["services"]:
-          components.extend([componentEntry for componentEntry in service["components"]
-                              if componentEntry["StackServiceComponents"]["component_category"] in categories
-                              and hostname in componentEntry["StackServiceComponents"]["hostnames"]])
-    return components
-
   # TODO, move this to a generic stack advisor.
   def getZKHostPortString(self, services, include_port=True):
     """
@@ -1312,8 +1078,8 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
             mountPoints = []
             for mountPoint in host["Hosts"]["disk_info"]:
               mountPoints.append(mountPoint["mountpoint"])
-            hbase_rootdir_mountpoint = getMountPointForDir(hbase_rootdir, mountPoints)
-            hbase_tmpdir_mountpoint = getMountPointForDir(hbase_tmpdir, mountPoints)
+            hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints)
+            hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints)
             preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
             # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
             # if multiple preferred_mountpoints exist
@@ -1331,7 +1097,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
             if dn_hosts and collectorHostName in dn_hosts and ams_site and \
               dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
               for dfs_datadir in dfs_datadirs:
-                dfs_datadir_mountpoint = getMountPointForDir(dfs_datadir, mountPoints)
+                dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints)
                 if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
                   item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
                                           "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
@@ -1383,10 +1149,10 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     if logDirItem:
       validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}])
 
-    hbase_master_heapsize = to_number(properties["hbase_master_heapsize"])
-    hbase_master_xmn_size = to_number(properties["hbase_master_xmn_size"])
-    hbase_regionserver_heapsize = to_number(properties["hbase_regionserver_heapsize"])
-    hbase_regionserver_xmn_size = to_number(properties["regionserver_xmn_size"])
+    hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"])
+    hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"])
+    hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"])
+    hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"])
 
     # Validate Xmn settings.
     masterXmnItem = None
@@ -1476,14 +1242,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
             heapPropertyToIncrease = "hbase_regionserver_heapsize" if is_hbase_distributed else "hbase_master_heapsize"
             xmnPropertyToIncrease = "regionserver_xmn_size" if is_hbase_distributed else "hbase_master_xmn_size"
-            hbase_needs_increase = to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb
+            hbase_needs_increase = self.to_number(properties[heapPropertyToIncrease]) * mb < 32 * gb
 
             if unusedMemory > 4*gb and hbase_needs_increase:  # warn user, if more than 4GB RAM is unused
 
-              recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + to_number(properties.get(heapPropertyToIncrease))*mb
+              recommended_hbase_heapsize = int((unusedMemory - 4*gb)*4/5) + self.to_number(properties.get(heapPropertyToIncrease))*mb
               recommended_hbase_heapsize = min(32*gb, recommended_hbase_heapsize) #Make sure heapsize <= 32GB
               recommended_hbase_heapsize = round_to_n(recommended_hbase_heapsize/mb,128) # Round to 128m multiple
-              if to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize:
+              if self.to_number(properties[heapPropertyToIncrease]) < recommended_hbase_heapsize:
                 hbaseHeapsizeItem = self.getWarnItem("Consider allocating {0} MB to {1} in ams-hbase-env to use up some "
                                                      "unused memory on host"
                                                          .format(recommended_hbase_heapsize,
@@ -1491,7 +1257,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
                 validationItems.extend([{"config-name": heapPropertyToIncrease, "item": hbaseHeapsizeItem}])
 
               recommended_xmn_size = round_to_n(0.15*recommended_hbase_heapsize,128)
-              if to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size:
+              if self.to_number(properties[xmnPropertyToIncrease]) < recommended_xmn_size:
                 xmnPropertyToIncreaseItem = self.getWarnItem("Consider allocating {0} MB to use up some unused memory "
                                                              "on host".format(recommended_xmn_size))
                 validationItems.extend([{"config-name": xmnPropertyToIncrease, "item": xmnPropertyToIncreaseItem}])
@@ -1505,7 +1271,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     mb = 1024 * 1024
     gb = 1024 * mb
     validationItems = []
-    collector_heapsize = to_number(ams_env.get("metrics_collector_heapsize"))
+    collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize"))
     amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
     for collectorHostName in amsCollectorHosts:
       for host in hosts["items"]:
@@ -1550,13 +1316,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
           if len(heapsize) > 1 and heapsize[-1] in '0123456789':
             heapsize = str(heapsize) + "m"
 
-          totalMemoryRequired += formatXmxSizeToBytes(heapsize)
+          totalMemoryRequired += self.formatXmxSizeToBytes(heapsize)
       else:
         if component == "METRICS_MONITOR" or "CLIENT" in component:
           heapsize = '512m'
         else:
           heapsize = '1024m'
-        totalMemoryRequired += formatXmxSizeToBytes(heapsize)
+        totalMemoryRequired += self.formatXmxSizeToBytes(heapsize)
     return totalMemoryRequired
 
   def getPreferredMountPoints(self, hostInfo):
@@ -1573,130 +1339,13 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
                 mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
                 mountpoint["type"] in undesirableFsTypes or
                 mountpoint["available"] == str(0)):
-          mountPointsDict[mountpoint["mountpoint"]] = to_number(mountpoint["available"])
+          mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
       if mountPointsDict:
         mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
     mountPoints.append("/")
     return mountPoints
 
-  def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, hostInfo):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set")
-    dir = properties[propertyName]
-    if not dir.startswith("file://") or dir == recommendedDefaults.get(propertyName):
-      return None
-
-    dir = re.sub("^file://", "", dir, count=1)
-    mountPoints = []
-    for mountPoint in hostInfo["disk_info"]:
-      mountPoints.append(mountPoint["mountpoint"])
-    mountPoint = getMountPointForDir(dir, mountPoints)
-
-    if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != mountPoint:
-      return self.getWarnItem("It is not recommended to use root partition for {0}".format(propertyName))
-
-    return None
-
-  def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, reqiuredDiskSpace):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set")
-    dir = properties[propertyName]
-    if not dir.startswith("file://"):
-      return None
-
-    dir = re.sub("^file://", "", dir, count=1)
-    mountPoints = {}
-    for mountPoint in hostInfo["disk_info"]:
-      mountPoints[mountPoint["mountpoint"]] = to_number(mountPoint["available"])
-    mountPoint = getMountPointForDir(dir, mountPoints.keys())
-
-    if not mountPoints:
-      return self.getErrorItem("No disk info found on host %s" % hostInfo["host_name"])
-
-    if mountPoints[mountPoint] < reqiuredDiskSpace:
-      msg = "Ambari Metrics disk space requirements not met. \n" \
-            "Recommended disk space for partition {0} is {1}G"
-      return self.getWarnItem(msg.format(mountPoint, reqiuredDiskSpace/1048576)) # in Gb
-    return None
-
-  def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
-    if propertyName not in recommendedDefaults:
-      # If a property name exists in say hbase-env and hbase-site (which is allowed), then it will exist in the
-      # "properties" dictionary, but not necessarily in the "recommendedDefaults" dictionary". In this case, ignore it.
-      return None
-
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set")
-    value = to_number(properties[propertyName])
-    if value is None:
-      return self.getErrorItem("Value should be integer")
-    defaultValue = to_number(recommendedDefaults[propertyName])
-    if defaultValue is None:
-      return None
-    if value < defaultValue:
-      return self.getWarnItem("Value is less than the recommended default of {0}".format(defaultValue))
-    return None
-
-  def validatorEqualsPropertyItem(self, properties1, propertyName1,
-                                  properties2, propertyName2,
-                                  emptyAllowed=False):
-    if not propertyName1 in properties1:
-      return self.getErrorItem("Value should be set for %s" % propertyName1)
-    if not propertyName2 in properties2:
-      return self.getErrorItem("Value should be set for %s" % propertyName2)
-    value1 = properties1.get(propertyName1)
-    if value1 is None and not emptyAllowed:
-      return self.getErrorItem("Empty value for %s" % propertyName1)
-    value2 = properties2.get(propertyName2)
-    if value2 is None and not emptyAllowed:
-      return self.getErrorItem("Empty value for %s" % propertyName2)
-    if value1 != value2:
-      return self.getWarnItem("It is recommended to set equal values "
-             "for properties {0} and {1}".format(propertyName1, propertyName2))
-
-    return None
-
-  def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
-                                       propertyName):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set for %s" % propertyName)
-    value = properties.get(propertyName)
-    if not propertyName in recommendedDefaults:
-      return self.getErrorItem("Value should be recommended for %s" % propertyName)
-    recommendedValue = recommendedDefaults.get(propertyName)
-    if value != recommendedValue:
-      return self.getWarnItem("It is recommended to set value {0} "
-             "for property {1}".format(recommendedValue, propertyName))
-    return None
-
-  def validatorNotEmpty(self, properties, propertyName):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set for {0}".format(propertyName))
-    value = properties.get(propertyName)
-    if not value:
-      return self.getWarnItem("Empty value for {0}".format(propertyName))
-    return None
-
-  def validateMinMemorySetting(self, properties, defaultValue, propertyName):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set")
-    if defaultValue is None:
-      return self.getErrorItem("Config's default value can't be null or undefined")
-
-    value = properties[propertyName]
-    if value is None:
-      return self.getErrorItem("Value can't be null or undefined")
-    try:
-      valueInt = to_number(value)
-      # TODO: generify for other use cases
-      defaultValueInt = int(str(defaultValue).strip())
-      if valueInt < defaultValueInt:
-        return self.getWarnItem("Value is less than the minimum recommended default of -Xmx" + str(defaultValue))
-    except:
-      return None
-
-    return None
-
+  # TODO, move to YARN Service Advisor
   def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services):
     if propertyName not in properties:
       return self.getErrorItem("Value should be set")
@@ -1712,6 +1361,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
 
     return None
 
+  # TODO, move to YARN Service Advisor
   def recommendYarnQueue(self, services, catalog_name=None, queue_property=None):
     old_queue_name = None
 
@@ -1737,15 +1387,15 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     defaultValue = recommendedDefaults[propertyName]
     if defaultValue is None:
       return self.getErrorItem("Config's default value can't be null or undefined")
-    if not checkXmxValueFormat(value) and checkXmxValueFormat(defaultValue):
+    if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue):
       # Xmx is in the default-value but not the value, should be an error
       return self.getErrorItem('Invalid value format')
-    if not checkXmxValueFormat(defaultValue):
+    if not self.checkXmxValueFormat(defaultValue):
       # if default value does not contain Xmx, then there is no point in validating existing value
       return None
-    valueInt = formatXmxSizeToBytes(getXmxSize(value))
-    defaultValueXmx = getXmxSize(defaultValue)
-    defaultValueInt = formatXmxSizeToBytes(defaultValueXmx)
+    valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value))
+    defaultValueXmx = self.getXmxSize(defaultValue)
+    defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx)
     if valueInt < defaultValueInt:
       return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx)
     return None
@@ -1837,24 +1487,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
         return dataNodeHosts
     return []
 
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    return {
-      'NAMENODE': {"else": 0},
-      'SECONDARY_NAMENODE': {"else": 1},
-      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
-      'HISTORYSERVER': {31: 1, "else": 2},
-      'RESOURCEMANAGER': {31: 1, "else": 2},
-
-      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
-      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
-      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
-      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
-      'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5},
-    }
-
   def get_system_min_uid(self):
     login_defs = '/etc/login.defs'
     uid_min_tag = 'UID_MIN'
@@ -2038,17 +1670,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
            "security_enabled" in services["configurations"]["cluster-env"]["properties"] and\
            services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true"
 
-  def get_services_list(self, services):
-    """
-    Returns available services as list
-
-    :type services dict
-    :rtype list
-    """
-    if not services:
-      return []
 
-    return [service["StackServices"]["service_name"] for service in services["services"]]
 
   def get_components_list(self, service, services):
     """
@@ -2103,88 +1725,7 @@ def getServicesSiteProperties(services, siteName):
     return None
   return siteConfig.get("properties")
 
-def to_number(s):
-  try:
-    return int(re.sub("\D", "", s))
-  except ValueError:
-    return None
 
-def checkXmxValueFormat(value):
-  p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
-  matches = p.findall(value)
-  return len(matches) == 1
-
-def getXmxSize(value):
-  p = re.compile("-Xmx(\d+)(.?)")
-  result = p.findall(value)[0]
-  if len(result) > 1:
-    # result[1] - is a space or size formatter (b|k|m|g etc)
-    return result[0] + result[1].lower()
-  return result[0]
-
-def formatXmxSizeToBytes(value):
-  value = value.lower()
-  if len(value) == 0:
-    return 0
-  modifier = value[-1]
-
-  if modifier == ' ' or modifier in "0123456789":
-    modifier = 'b'
-  m = {
-    modifier == 'b': 1,
-    modifier == 'k': 1024,
-    modifier == 'm': 1024 * 1024,
-    modifier == 'g': 1024 * 1024 * 1024,
-    modifier == 't': 1024 * 1024 * 1024 * 1024,
-    modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
-    }[1]
-  return to_number(value) * m
-
-def getPort(address):
-  """
-  Extracts port from the address like 0.0.0.0:1019
-  """
-  if address is None:
-    return None
-  m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
-  if m is not None:
-    return int(m.group(2))
-  else:
-    return None
-
-def isSecurePort(port):
-  """
-  Returns True if port is root-owned at *nix systems
-  """
-  if port is not None:
-    return port < 1024
-  else:
-    return False
-
-def getMountPointForDir(dir, mountPoints):
-  """
-  :param dir: Directory to check, even if it doesn't exist.
-  :return: Returns the closest mount point as a string for the directory.
-  if the "dir" variable is None, will return None.
-  If the directory does not exist, will return "/".
-  """
-  bestMountFound = None
-  if dir:
-    dir = re.sub("^file://", "", dir, count=1).strip().lower()
-
-    # If the path is "/hadoop/hdfs/data", then possible matches for mounts could be
-    # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
-    # So take the one with the greatest number of segments.
-    for mountPoint in mountPoints:
-      # Ensure that the mount path and the dir path ends with "/"
-      # The mount point "/hadoop" should not match with the path "/hadoop1"
-      if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
-        if bestMountFound is None:
-          bestMountFound = mountPoint
-        elif os.path.join(bestMountFound, "").count(os.path.sep) < os.path.join(mountPoint, "").count(os.path.sep):
-          bestMountFound = mountPoint
-
-  return bestMountFound
 
 def round_to_n(mem_size, n=128):
   return int(round(mem_size / float(n))) * int(n)

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
index b275b00..4822732 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
@@ -32,6 +32,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -74,6 +75,20 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS', 'GANGLIA_SERVER', 'METRICS_COLLECTOR'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+    })
+
   def getServiceConfigurationRecommenderDict(self):
     parentRecommendConfDict = super(HDP21StackAdvisor, self).getServiceConfigurationRecommenderDict()
     childRecommendConfDict = {
@@ -210,16 +225,6 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
     if recommended_tez_queue is not None:
       putTezProperty("tez.queue.name", recommended_tez_queue)
 
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    parentSchemes = super(HDP21StackAdvisor, self).getComponentLayoutSchemes()
-    childSchemes = {
-        'APP_TIMELINE_SERVER': {31: 1, "else": 2},
-        'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
-    }
-    parentSchemes.update(childSchemes)
-    return parentSchemes
-
   def getServiceConfigurationValidators(self):
     parentValidators = super(HDP21StackAdvisor, self).getServiceConfigurationValidators()
     childValidators = {
@@ -233,10 +238,10 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
     validationItems = [ {"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')},
                         {"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')},
                         {"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ]
-    yarnSiteProperties = getSiteProperties(configurations, "yarn-site")
+    yarnSiteProperties = self.getSiteProperties(configurations, "yarn-site")
     if yarnSiteProperties:
-      yarnSchedulerMaximumAllocationMb = to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
-      hiveTezContainerSize = to_number(properties['hive.tez.container.size'])
+      yarnSchedulerMaximumAllocationMb = self.to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
+      hiveTezContainerSize = self.to_number(properties['hive.tez.container.size'])
       if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb:
         validationItems.append({"config-name": 'hive.tez.container.size', "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")})
     return self.toConfigurationValidationProblems(validationItems, "hive-site")

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index 8980398..f108622 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -30,6 +30,7 @@ import xml.etree.ElementTree as ET
 # Local Imports
 from resource_management.core.logger import Logger
 
+
 class HDP22StackAdvisor(HDP21StackAdvisor):
 
   def __init__(self):
@@ -1202,23 +1203,23 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
                         {"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ]
 
     if 'mapreduce.map.java.opts' in properties and \
-      checkXmxValueFormat(properties['mapreduce.map.java.opts']):
-      mapreduceMapJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024)
-      mapreduceMapMemoryMb = to_number(properties['mapreduce.map.memory.mb'])
+      self.checkXmxValueFormat(properties['mapreduce.map.java.opts']):
+      mapreduceMapJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024)
+      mapreduceMapMemoryMb = self.to_number(properties['mapreduce.map.memory.mb'])
       if mapreduceMapJavaOpts > mapreduceMapMemoryMb:
         validationItems.append({"config-name": 'mapreduce.map.java.opts', "item": self.getWarnItem("mapreduce.map.java.opts Xmx should be less than mapreduce.map.memory.mb ({0})".format(mapreduceMapMemoryMb))})
 
     if 'mapreduce.reduce.java.opts' in properties and \
-      checkXmxValueFormat(properties['mapreduce.reduce.java.opts']):
-      mapreduceReduceJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024)
-      mapreduceReduceMemoryMb = to_number(properties['mapreduce.reduce.memory.mb'])
+      self.checkXmxValueFormat(properties['mapreduce.reduce.java.opts']):
+      mapreduceReduceJavaOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024)
+      mapreduceReduceMemoryMb = self.to_number(properties['mapreduce.reduce.memory.mb'])
       if mapreduceReduceJavaOpts > mapreduceReduceMemoryMb:
         validationItems.append({"config-name": 'mapreduce.reduce.java.opts', "item": self.getWarnItem("mapreduce.reduce.java.opts Xmx should be less than mapreduce.reduce.memory.mb ({0})".format(mapreduceReduceMemoryMb))})
 
     if 'yarn.app.mapreduce.am.command-opts' in properties and \
-      checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']):
-      yarnAppMapreduceAmCommandOpts = formatXmxSizeToBytes(getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024)
-      yarnAppMapreduceAmResourceMb = to_number(properties['yarn.app.mapreduce.am.resource.mb'])
+      self.checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']):
+      yarnAppMapreduceAmCommandOpts = self.formatXmxSizeToBytes(self.getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024)
+      yarnAppMapreduceAmResourceMb = self.to_number(properties['yarn.app.mapreduce.am.resource.mb'])
       if yarnAppMapreduceAmCommandOpts > yarnAppMapreduceAmResourceMb:
         validationItems.append({"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.getWarnItem("yarn.app.mapreduce.am.command-opts Xmx should be less than yarn.app.mapreduce.am.resource.mb ({0})".format(yarnAppMapreduceAmResourceMb))})
 
@@ -1283,7 +1284,7 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
     for address_property in address_properties:
       if address_property in hdfs_site:
         value = hdfs_site[address_property]
-        if not is_valid_host_port_authority(value):
+        if not self.is_valid_host_port_authority(value):
           validationItems.append({"config-name" : address_property, "item" :
             self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)})
 
@@ -1312,15 +1313,15 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
       data_transfer_protection = 'dfs.data.transfer.protection'
 
       try: # Params may be absent
-        privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address]))
+        privileged_dfs_dn_port = self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address]))
       except KeyError:
         privileged_dfs_dn_port = False
       try:
-        privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address]))
+        privileged_dfs_http_port = self.isSecurePort(self.getPort(hdfs_site[datanode_http_address]))
       except KeyError:
         privileged_dfs_http_port = False
       try:
-        privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address]))
+        privileged_dfs_https_port = self.isSecurePort(self.getPort(hdfs_site[datanode_https_address]))
       except KeyError:
         privileged_dfs_https_port = False
       try:
@@ -1770,15 +1771,3 @@ def is_number(s):
     pass
 
   return False
-
-def is_valid_host_port_authority(target):
-  has_scheme = "://" in target
-  if not has_scheme:
-    target = "dummyscheme://"+target
-  try:
-    result = urlparse(target)
-    if result.hostname is not None and result.port is not None:
-      return True
-  except ValueError:
-    pass
-  return False