You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/27 18:17:27 UTC

[16/49] ambari git commit: AMBARI-19098. HDP 3.0 TP - create Service Advisor for YARN/MR (alejandro)

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
new file mode 100644
index 0000000..cd35b68
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
@@ -0,0 +1,1789 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+import inspect
+import socket
+from math import floor, ceil
+
+# Local imports
+from resource_management.core.logger import Logger
+
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+  with open(PARENT_FILE, 'rb') as fp:
+    service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+  traceback.print_exc()
+  print "Failed to load parent"
+
+
+class YARNServiceAdvisor(service_advisor.ServiceAdvisor):
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(YARNServiceAdvisor, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    # Always call these methods
+    self.modifyMastersWithMultipleInstances()
+    self.modifyCardinalitiesDict()
+    self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
+
+  def modifyMastersWithMultipleInstances(self):
+    """
+    Modify the set of masters with multiple instances.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyCardinalitiesDict(self):
+    """
+    Modify the dictionary of cardinalities.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyHeapSizeProperties(self):
+    """
+    Modify the dictionary of heap size properties.
+    Must be overriden in child class.
+    """
+    self.heap_size_properties = {}
+
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other services.
+    Must be overriden in child class.
+    """
+    self.notValuableComponents.add("APP_TIMELINE_SERVER")
+
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+    })
+
+  def getServiceComponentLayoutValidations(self, services, hosts):
+    """
+    Get a list of errors.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+  def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+    """
+    Entry point.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    # Due to the existing stack inheritance, make it clear where each calculation came from.
+    recommender = YARNRecommender()
+
+    # YARN
+    recommender.recommendYARNConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+    recommender.recommendYARNConfigurationsFromHDP22(configurations, clusterData, services, hosts)
+    recommender.recommendYARNConfigurationsFromHDP23(configurations, clusterData, services, hosts)
+    recommender.recommendYARNConfigurationsFromHDP25(configurations, clusterData, services, hosts)
+
+  def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+    """
+    Entry point.
+    Validate configurations for the service. Return a list of errors.
+    The code for this function should be the same for each Service Advisor.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    validator = YARNValidator()
+    # Calls the methods of the validator using arguments,
+    # method(siteProperties, siteRecommendations, configurations, services, hosts)
+    return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+class MAPREDUCE2ServiceAdvisor(service_advisor.ServiceAdvisor):
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(MAPREDUCE2ServiceAdvisor, self)
+    self.as_super.__init__(*args, **kwargs)
+
+    # Always call these methods
+    self.modifyMastersWithMultipleInstances()
+    self.modifyCardinalitiesDict()
+    self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
+
+  def modifyMastersWithMultipleInstances(self):
+    """
+    Modify the set of masters with multiple instances.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyCardinalitiesDict(self):
+    """
+    Modify the dictionary of cardinalities.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyHeapSizeProperties(self):
+    """
+    Modify the dictionary of heap size properties.
+    Must be overriden in child class.
+    """
+    self.heap_size_properties = {}
+
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other services.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes.update({
+      'HISTORYSERVER': {31: 1, "else": 2},
+    })
+
+  def getServiceComponentLayoutValidations(self, services, hosts):
+    """
+    Get a list of errors.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+  def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+    """
+    Entry point.
+    Must be overriden in child class.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    # Due to the existing stack inheritance, make it clear where each calculation came from.
+    recommender = MAPREDUCE2Recommender()
+    recommender.recommendMapReduce2ConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+    recommender.recommendMapReduce2ConfigurationsFromHDP22(configurations, clusterData, services, hosts)
+
+  def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+    """
+    Entry point.
+    Validate configurations for the service. Return a list of errors.
+    The code for this function should be the same for each Service Advisor.
+    """
+    Logger.info("Class: %s, Method: %s. Validating Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    validator = YARNValidator()
+    # Calls the methods of the validator using arguments,
+    # method(siteProperties, siteRecommendations, configurations, services, hosts)
+    return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+class YARNRecommender(service_advisor.ServiceAdvisor):
+  """
+  YARN Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+  """
+
+  HIVE_INTERACTIVE_SITE = 'hive-interactive-site'
+  YARN_ROOT_DEFAULT_QUEUE_NAME = 'default'
+  CONFIG_VALUE_UINITIALIZED = 'SET_ON_FIRST_INVOCATION'
+
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(YARNRecommender, self)
+    self.as_super.__init__(*args, **kwargs)
+
+  def recommendYARNConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
+    """
+    Recommend configurations for this service based on HDP 2.0.6.
+    """
+    Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+                (self.__class__.__name__, inspect.stack()[0][3]))
+
+    putYarnProperty = self.putProperty(configurations, "yarn-site", services)
+    putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site")
+    putYarnEnvProperty = self.putProperty(configurations, "yarn-env", services)
+    nodemanagerMinRam = 1048576 # 1TB in mb
+    if "referenceNodeManagerHost" in clusterData:
+      nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam)
+
+    callContext = self.getCallContext(services)
+    putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+    # read from the supplied config
+    #if 'recommendConfigurations' != callContext and \
+    #        "yarn-site" in services["configurations"] and \
+    #        "yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]:
+    #    putYarnProperty('yarn.nodemanager.resource.memory-mb', int(services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+    if 'recommendConfigurations' == callContext:
+      putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+    else:
+      # read from the supplied config
+      if "yarn-site" in services["configurations"] and "yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]:
+        putYarnProperty('yarn.nodemanager.resource.memory-mb', int(services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+      else:
+        putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+      pass
+    pass
+
+    putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterData['yarnMinContainerSize']))
+    putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+    putYarnEnvProperty('min_user_id', self.get_system_min_uid())
+
+    yarn_mount_properties = [
+      ("yarn.nodemanager.local-dirs", "NODEMANAGER", "/hadoop/yarn/local", "multi"),
+      ("yarn.nodemanager.log-dirs", "NODEMANAGER", "/hadoop/yarn/log", "multi"),
+      ("yarn.timeline-service.leveldb-timeline-store.path", "APP_TIMELINE_SERVER", "/hadoop/yarn/timeline", "single"),
+      ("yarn.timeline-service.leveldb-state-store.path", "APP_TIMELINE_SERVER", "/hadoop/yarn/timeline", "single")
+    ]
+
+    self.updateMountProperties("yarn-site", yarn_mount_properties, configurations, services, hosts)
+
+    sc_queue_name = self.recommendYarnQueue(services, "yarn-env", "service_check.queue.name")
+    if sc_queue_name is not None:
+      putYarnEnvProperty("service_check.queue.name", sc_queue_name)
+
+    containerExecutorGroup = 'hadoop'
+    if 'cluster-env' in services['configurations'] and 'user_group' in services['configurations']['cluster-env']['properties']:
+      containerExecutorGroup = services['configurations']['cluster-env']['properties']['user_group']
+    putYarnProperty("yarn.nodemanager.linux-container-executor.group", containerExecutorGroup)
+
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    if "TEZ" in servicesList:
+      ambari_user = self.getAmbariUser(services)
+      ambariHostName = socket.getfqdn()
+      putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(ambari_user), ambariHostName)
+      putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(ambari_user), "*")
+      old_ambari_user = self.getOldAmbariUser(services)
+      if old_ambari_user is not None:
+        putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+        putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+  def recommendYARNConfigurationsFromHDP22(self, configurations, clusterData, services, hosts):
+    putYarnProperty = self.putProperty(configurations, "yarn-site", services)
+    putYarnProperty('yarn.nodemanager.resource.cpu-vcores', clusterData['cpu'])
+    putYarnProperty('yarn.scheduler.minimum-allocation-vcores', 1)
+    putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+    # Property Attributes
+    putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site")
+    nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts)
+    if (nodeManagerHost is not None):
+      cpuPercentageLimit = 0.8
+      if "yarn.nodemanager.resource.percentage-physical-cpu-limit" in configurations["yarn-site"]["properties"]:
+        cpuPercentageLimit = float(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.percentage-physical-cpu-limit"])
+      cpuLimit = max(1, int(floor(nodeManagerHost["Hosts"]["cpu_count"] * cpuPercentageLimit)))
+      putYarnProperty('yarn.nodemanager.resource.cpu-vcores', str(cpuLimit))
+      putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+      putYarnPropertyAttribute('yarn.nodemanager.resource.memory-mb', 'maximum', int(nodeManagerHost["Hosts"]["total_mem"] / 1024)) # total_mem in kb
+      putYarnPropertyAttribute('yarn.nodemanager.resource.cpu-vcores', 'maximum', nodeManagerHost["Hosts"]["cpu_count"] * 2)
+      putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+      putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+      putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
+      putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
+
+      kerberos_authentication_enabled = self.isSecurityEnabled(services)
+      if kerberos_authentication_enabled:
+        putYarnProperty('yarn.nodemanager.container-executor.class',
+                        'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor')
+
+      if "yarn-env" in services["configurations"] and "yarn_cgroups_enabled" in services["configurations"]["yarn-env"]["properties"]:
+        yarn_cgroups_enabled = services["configurations"]["yarn-env"]["properties"]["yarn_cgroups_enabled"].lower() == "true"
+        if yarn_cgroups_enabled:
+          putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor')
+          putYarnProperty('yarn.nodemanager.linux-container-executor.group', 'hadoop')
+          putYarnProperty('yarn.nodemanager.linux-container-executor.resources-handler.class', 'org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler')
+          putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', '/yarn')
+          putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount', 'true')
+          putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount-path', '/cgroup')
+        else:
+          if not kerberos_authentication_enabled:
+            putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor')
+          putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.resources-handler.class', 'delete', 'true')
+          putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', 'delete', 'true')
+          putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount', 'delete', 'true')
+          putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount-path', 'delete', 'true')
+    # recommend hadoop.registry.rm.enabled based on SLIDER in services
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    if "SLIDER" in servicesList:
+      putYarnProperty('hadoop.registry.rm.enabled', 'true')
+    else:
+      putYarnProperty('hadoop.registry.rm.enabled', 'false')
+
+  def recommendYARNConfigurationsFromHDP23(self, configurations, clusterData, services, hosts):
+    putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services)
+    putYarnSitePropertyAttributes = self.putPropertyAttribute(configurations, "yarn-site")
+
+    if "tez-site" not in services["configurations"]:
+      putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', '')
+    else:
+      putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', 'org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl')
+
+    if "ranger-env" in services["configurations"] and "ranger-yarn-plugin-properties" in services["configurations"] and \
+            "ranger-yarn-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
+      putYarnRangerPluginProperty = self.putProperty(configurations, "ranger-yarn-plugin-properties", services)
+      rangerEnvYarnPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-yarn-plugin-enabled"]
+      putYarnRangerPluginProperty("ranger-yarn-plugin-enabled", rangerEnvYarnPluginProperty)
+    rangerPluginEnabled = ''
+    if 'ranger-yarn-plugin-properties' in configurations and 'ranger-yarn-plugin-enabled' in configurations['ranger-yarn-plugin-properties']['properties']:
+      rangerPluginEnabled = configurations['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled']
+    elif 'ranger-yarn-plugin-properties' in services['configurations'] and 'ranger-yarn-plugin-enabled' in services['configurations']['ranger-yarn-plugin-properties']['properties']:
+      rangerPluginEnabled = services['configurations']['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled']
+
+    if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()):
+      putYarnSiteProperty('yarn.acl.enable','true')
+      putYarnSiteProperty('yarn.authorization-provider','org.apache.ranger.authorization.yarn.authorizer.RangerYarnAuthorizer')
+    else:
+      putYarnSitePropertyAttributes('yarn.authorization-provider', 'delete', 'true')
+
+  def recommendYARNConfigurationsFromHDP25(self, configurations, clusterData, services, hosts):
+    hsi_env_poperties = self.getServicesSiteProperties(services, "hive-interactive-env")
+    cluster_env = self.getServicesSiteProperties(services, "cluster-env")
+
+    # Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP)
+    if hsi_env_poperties and 'enable_hive_interactive' in hsi_env_poperties:
+      enable_hive_interactive = hsi_env_poperties['enable_hive_interactive']
+      LLAP_QUEUE_NAME = 'llap'
+
+      # Hive Server interactive is already added or getting added
+      if enable_hive_interactive == 'true':
+        self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME)
+      else:  # When Hive Interactive Server is in 'off/removed' state.
+        self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME)
+
+    putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services)
+    stack_root = "/usr/hdp"
+    if cluster_env and "stack_root" in cluster_env:
+      stack_root = cluster_env["stack_root"]
+
+    timeline_plugin_classes_values = []
+    timeline_plugin_classpath_values = []
+
+    if self.isServiceDeployed(services, "TEZ"):
+      timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl')
+
+    if self.isServiceDeployed(services, "SPARK"):
+      timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin')
+      timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*")
+
+    putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values))
+    putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values))
+
+  #region LLAP
+  def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
+    """
+    Entry point for updating Hive's 'LLAP app' configs namely :
+      (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
+      (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
+      (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size
+      (10). tez.runtime.io.sort.mb  (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and
+      (13). hive.llap.io.enabled.
+
+      The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
+      (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
+      (4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
+
+      If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
+      value is not calulated, but read and use in calculation for dependent configs.
+
+      Note: All memory calculations are in MB, unless specified otherwise.
+    """
+    Logger.info("DBG: Entered updateLlapConfigs");
+    putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+    putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+    putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+    putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+    putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services)
+    llap_daemon_selected_queue_name = None
+    selected_queue_is_ambari_managed_llap = None  # Queue named 'llap' at root level is Ambari managed.
+    llap_selected_queue_am_percent = None
+    DEFAULT_EXECUTOR_TO_AM_RATIO = 20
+    MIN_EXECUTOR_TO_AM_RATIO = 10
+    MAX_CONCURRENT_QUERIES = 32
+    leafQueueNames = None
+    MB_TO_BYTES = 1048576
+    hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+    yarn_site = self.getServicesSiteProperties(services, "yarn-site")
+
+    # Update 'hive.llap.daemon.queue.name' prop combo entries
+    self.setLlapDaemonQueuePropAttributes(services, configurations)
+
+    if not services["changed-configurations"]:
+      read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations))
+      putHiveInteractiveSiteProperty("hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb)
+
+    if hsi_site and "hive.llap.daemon.queue.name" in hsi_site:
+      llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"]
+
+    # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'.
+    capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+    if capacity_scheduler_properties:
+      # Get all leaf queues.
+      leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+      Logger.info("YARN leaf Queues = {0}".format(leafQueueNames))
+      if len(leafQueueNames) == 0:
+        Logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.")
+        return
+
+      # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
+      changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False)
+      llap_named_queue_selected_in_curr_invocation = None
+      if changed_configs_has_enable_hive_int \
+          and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
+        if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames):
+          llap_named_queue_selected_in_curr_invocation = True
+          putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
+          putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+        else:
+          first_leaf_queue = list(leafQueueNames)[0]  # 1st invocation, pick the 1st leaf queue and set it as selected.
+          putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue)
+          putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue)
+          llap_named_queue_selected_in_curr_invocation = False
+      Logger.info("DBG: llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation))
+
+      if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and llap_daemon_selected_queue_name == llap_queue_name) or
+            llap_named_queue_selected_in_curr_invocation) or \
+          (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation):
+        Logger.info("Setting visibility of num_llap_nodes to true.")
+        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true")
+        selected_queue_is_ambari_managed_llap = True
+        Logger.info("DBG: Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+                    "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames)))
+      else:
+        Logger.info("Setting visibility of num_llap_nodes to false.")
+        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
+        Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+                    "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames)))
+        selected_queue_is_ambari_managed_llap = False
+
+      if not llap_named_queue_selected_in_curr_invocation:  # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
+        # state information pertaining to 'llap' queue.
+        # Check: State of the selected queue should not be STOPPED.
+        if llap_daemon_selected_queue_name:
+          llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
+          if llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED":
+            Logger.error("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
+                         "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state))
+            self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+            return
+        else:
+          Logger.error("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
+                       .format(llap_daemon_selected_queue_name))
+          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+          return
+    else:
+      Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
+                   " Not calculating LLAP configs.")
+      return
+
+    changed_configs_in_hive_int_env = None
+    llap_concurrency_in_changed_configs = None
+    llap_daemon_queue_in_changed_configs = None
+    # Calculations are triggered only if there is change in any one of the following props :
+    # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
+    # or 'hive.llap.daemon.queue.name' has change in value selection.
+    # OR
+    # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
+    if 'changed-configurations' in services.keys():
+      config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive'])
+      changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(services, "hive-interactive-env", config_names_to_be_checked, False)
+
+      # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
+      llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(services, YARNRecommender.HIVE_INTERACTIVE_SITE, ['hive.server2.tez.sessions.per.default.queue'], False)
+      llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(services, YARNRecommender.HIVE_INTERACTIVE_SITE, ['hive.llap.daemon.queue.name'], False)
+
+    if not changed_configs_in_hive_int_env and not llap_concurrency_in_changed_configs and \
+        not llap_daemon_queue_in_changed_configs and services["changed-configurations"]:
+      Logger.info("DBG: LLAP parameters not modified. Not adjusting LLAP configs.")
+      Logger.info("DBG: Current 'changed-configuration' received is : {0}".format(services["changed-configurations"]))
+      return
+
+    Logger.info("\nDBG: Performing LLAP config calculations ......")
+    node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+    node_manager_cnt = len(node_manager_host_list)
+    yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+    total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
+    Logger.info("DBG: Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
+                "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb))
+    yarn_min_container_size = float(self.get_yarn_min_container_size(services, configurations))
+    tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
+    normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+
+    if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site:
+      cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"])
+    else:
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+    Logger.info("DBG Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
+                "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size,
+                                                      total_cluster_capacity))
+
+    # Calculate the available memory for LLAP app
+    yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
+    mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)
+    Logger.info("DBG: Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
+                "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host))
+
+
+    if mem_per_thread_for_llap is None:
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+
+    mem_per_thread_for_llap = float(mem_per_thread_for_llap)
+
+    Logger.info("DBG: selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap))
+    if not selected_queue_is_ambari_managed_llap:
+      llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity)
+
+      if llap_daemon_selected_queue_cap <= 0:
+        Logger.warning("'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format(
+          llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap))
+        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+        return
+
+      total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size)
+      Logger.info("DBG: Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
+                  "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized,
+                                                         llap_daemon_selected_queue_cap, yarn_min_container_size))
+      '''Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulkations will take care of cutting this down if required'''
+      num_llap_nodes_requested = ceil(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized)
+      Logger.info("DBG: Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
+                  "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized))
+      queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name))
+      hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
+      Logger.info("DBG: Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
+                  "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized))
+    else:  # Ambari managed 'llap' named queue at root level.
+      num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input
+      total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
+      Logger.info("DBG: Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
+                  "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized))
+      total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size))
+      Logger.info("DBG: Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
+                  "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size))
+
+      # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
+      llap_named_queue_cap_fraction = ceil(total_llap_mem_normalized / total_cluster_capacity * 100)
+      Logger.info("DBG: Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction))
+
+      if llap_named_queue_cap_fraction > 100:
+        Logger.warning("Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction))
+        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+        return
+
+      # Adjust capacity scheduler for the 'llap' named queue.
+      self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction)
+      hive_tez_am_cap_available = total_llap_mem_normalized
+      Logger.info("DBG: hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available))
+
+    # Common calculations now, irrespective of the queue selected.
+
+    # Get calculated value for Slider AM container Size
+    slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
+                                                 yarn_min_container_size)
+    Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : "
+                "{1}".format(slider_am_container_size, yarn_min_container_size))
+
+    llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size
+    Logger.info("DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
+                "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size))
+
+    if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size:
+      Logger.warning("Not enough capacity available on the cluster to run LLAP")
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+
+    # Calculate llap concurrency (i.e. Number of Tez AM's)
+    max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+
+    # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
+    if not llap_concurrency_in_changed_configs:
+      if max_executors_per_node <= 0:
+        Logger.warning("Calculated 'max_executors_per_node' = {0}. Expected value >= 1.".format(max_executors_per_node))
+        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+        return
+
+      Logger.info("DBG: Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+                  "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+
+      # Default 1 AM for every 20 executor threads.
+      # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
+      # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
+      # instead limited by #CPUs. Use maxPerNode to factor this in.
+      llap_concurreny_limit = min(floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+      Logger.info("DBG: Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
+                  ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES))
+      llap_concurrency = min(llap_concurreny_limit, floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size)))
+      Logger.info("DBG: Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+                  "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+                  "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO,
+                               mem_per_thread_for_llap, normalized_tez_am_container_size))
+      if llap_concurrency == 0:
+        llap_concurrency = 1
+
+      if llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available:
+        llap_concurrency = floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+
+        if llap_concurrency <= 0:
+          Logger.warning("Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency))
+          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+          return
+        Logger.info("DBG: Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+                    "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size))
+    else:
+      # Read current value
+      if 'hive.server2.tez.sessions.per.default.queue' in hsi_site:
+        llap_concurrency = long(hsi_site['hive.server2.tez.sessions.per.default.queue'])
+        if llap_concurrency <= 0:
+          Logger.warning("'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1".format(llap_concurrency))
+          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+          return
+        Logger.info("DBG: Read 'llap_concurrency' : {0}".format(llap_concurrency ))
+      else:
+        llap_concurrency = 1
+        Logger.warning("Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1.")
+        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+        return
+
+    # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
+    max_llap_concurreny_limit = min(floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+    Logger.info("DBG: Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
+                ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node,
+                                                                                             num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO,
+                                                                                             MAX_CONCURRENT_QUERIES))
+    max_llap_concurreny = min(max_llap_concurreny_limit, floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO *
+                                                                                                      mem_per_thread_for_llap + normalized_tez_am_container_size)))
+    Logger.info("DBG: Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+                "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+                "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO,
+                             mem_per_thread_for_llap, normalized_tez_am_container_size))
+    if max_llap_concurreny == 0:
+      max_llap_concurreny = 1
+      Logger.info("DBG: Adjusted 'max_llap_concurreny' : 1.")
+
+    if (max_llap_concurreny * normalized_tez_am_container_size) > hive_tez_am_cap_available:
+      max_llap_concurreny = floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+      if max_llap_concurreny <= 0:
+        Logger.warning("Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny))
+        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+        return
+      Logger.info("DBG: Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+                  "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size))
+
+    # Calculate value for 'num_llap_nodes', an across cluster config.
+    tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
+    Logger.info("DBG: Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
+                "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size))
+    llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
+
+    if llap_mem_daemon_size < yarn_min_container_size:
+      Logger.warning("Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format(
+        llap_mem_daemon_size, yarn_min_container_size))
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+
+    if llap_mem_daemon_size < mem_per_thread_for_llap or llap_mem_daemon_size < yarn_min_container_size:
+      Logger.warning("Not enough memory available for executors.")
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+    Logger.info("DBG: Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
+                "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required))
+
+    llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size)
+    Logger.info("DBG: Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
+                "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size))
+    if llap_daemon_mem_per_node == 0:
+      # Small cluster. No capacity left on a node after running AMs.
+      llap_daemon_mem_per_node = mem_per_thread_for_llap
+      num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+      Logger.info("DBG: 'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
+                  "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap))
+    elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
+      # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
+      llap_daemon_mem_per_node = mem_per_thread_for_llap
+      num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+      Logger.info("DBG: 'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
+                  ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node))
+    else:
+      # All good. We have a proper value for memoryPerNode.
+      num_llap_nodes = num_llap_nodes_requested
+      Logger.info("DBG: num_llap_nodes : {0}".format(num_llap_nodes))
+
+    num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+    if num_executors_per_node_max < 1:
+      Logger.warning("Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max))
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+    Logger.info("DBG: Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+                "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+
+    # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
+    num_executors_per_node = min(floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max)
+    if num_executors_per_node <= 0:
+      Logger.warning("Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node))
+      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+      return
+    Logger.info("DBG: Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
+                "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap))
+
+    # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
+    total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
+    cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
+
+    tez_runtime_io_sort_mb = (long((0.8 * mem_per_thread_for_llap) / 3))
+    tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap)
+    # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
+    hive_auto_convert_join_noconditionaltask_size = (long((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES
+
+    # Calculate value for prop 'llap_heap_size'
+    llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations))
+    Logger.info("DBG: Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node))
+
+    # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
+    hive_server_interactive_heapsize = None
+    hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
+    if hive_server_interactive_hosts is None:
+      # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
+      hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
+    if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0:
+      host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
+      hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem)
+      Logger.info("DBG: Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
+                  "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem))
+
+    # Done with calculations, updating calculated configs.
+    Logger.info("DBG: Applying the calculated values....")
+
+    normalized_tez_am_container_size = long(normalized_tez_am_container_size)
+    putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size)
+
+    if not llap_concurrency_in_changed_configs:
+      min_llap_concurrency = 1
+      putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency)
+      putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum",
+                                              min_llap_concurrency)
+
+    putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny)
+
+    num_llap_nodes = long(num_llap_nodes)
+    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+    #TODO A single value is not being set for numNodes in case of a custom queue. Also the attribute is set to non-visible, so the UI likely ends up using an old cached value
+    if (num_llap_nodes != num_llap_nodes_requested):
+      Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes))
+    else:
+      Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested))
+
+    llap_container_size = long(llap_daemon_mem_per_node)
+    putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size)
+
+    # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
+    # Else, we don't (1). Override the previous calculated value or (2). User provided value.
+    if self.get_hive_tez_container_size(services) == YARNRecommender.CONFIG_VALUE_UINITIALIZED:
+      mem_per_thread_for_llap = long(mem_per_thread_for_llap)
+      putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap)
+
+    putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb)
+    if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]:
+      if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY":
+        putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800)
+
+    putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size)
+    putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size)
+
+    num_executors_per_node = long(num_executors_per_node)
+    putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node)
+    putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
+    putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", float(num_executors_per_node_max))
+
+    # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
+    # 'hive.llap.daemon.num.executors' at all times.
+    cache_mem_per_node = long(cache_mem_per_node)
+
+    putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node)
+    putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node)
+
+    if hive_server_interactive_heapsize is not None:
+      putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize))
+
+    llap_io_enabled = 'true' if long(cache_mem_per_node) >= 64 else 'false'
+    putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled)
+
+    putHiveInteractiveEnvProperty('llap_heap_size', long(llap_xmx))
+    putHiveInteractiveEnvProperty('slider_am_container_mb', long(slider_am_container_size))
+    Logger.info("DBG: Done putting all configs")
+
+  def recommendDefaultLlapConfiguration(self, configurations, services, hosts):
+    Logger.info("DBG: Something likely went wrong. recommendDefaultLlapConfiguration")
+    putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+    putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+    putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+    putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+
+    yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
+    slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size))
+
+    node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+    node_manager_cnt = len(node_manager_host_list)
+
+    putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1)
+    putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1)
+    putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1)
+    putHiveInteractiveEnvProperty('num_llap_nodes', 0)
+    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+    putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+    putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size)
+    putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size)
+    putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0)
+    putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
+    putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0)
+    putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0)
+    putHiveInteractiveEnvProperty('llap_heap_size', 0)
+    putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size)
+
+  def get_num_llap_nodes(self, services, configurations):
+    """
+    Returns current value of number of LLAP nodes in cluster (num_llap_nodes)
+
+    :type services: dict
+    :type configurations: dict
+    :rtype int
+    """
+    hsi_env = self.getServicesSiteProperties(services, "hive-interactive-env")
+    hsi_env_properties = self.getSiteProperties(configurations, "hive-interactive-env")
+    num_llap_nodes = 0
+
+    # Check if 'num_llap_nodes' is modified in current ST invocation.
+    if hsi_env_properties and 'num_llap_nodes' in hsi_env_properties:
+      num_llap_nodes = hsi_env_properties['num_llap_nodes']
+    elif hsi_env and 'num_llap_nodes' in hsi_env:
+      num_llap_nodes = hsi_env['num_llap_nodes']
+    else:
+      Logger.error("Couldn't retrieve Hive Server 'num_llap_nodes' config. Setting value to {0}".format(num_llap_nodes))
+
+    return float(num_llap_nodes)
+
+  def get_max_executors_per_node(self, nm_mem_per_node_normalized, nm_cpus_per_node, mem_per_thread):
+    # TODO: This potentially takes up the entire node leaving no space for AMs.
+    return min(floor(nm_mem_per_node_normalized / mem_per_thread), nm_cpus_per_node)
+
+  def calculate_mem_per_thread_for_llap(self, services, nm_mem_per_node_normalized, cpu_per_nm_host):
+    """
+    Calculates 'mem_per_thread_for_llap' for 1st time initialization. Else returns 'hive.tez.container.size' read value.
+    """
+    hive_tez_container_size = self.get_hive_tez_container_size(services)
+
+    if hive_tez_container_size == self.CONFIG_VALUE_UINITIALIZED:
+      if nm_mem_per_node_normalized <= 1024:
+        calculated_hive_tez_container_size = min(512, nm_mem_per_node_normalized)
+      elif nm_mem_per_node_normalized <= 4096:
+        calculated_hive_tez_container_size = 1024
+      elif nm_mem_per_node_normalized <= 10240:
+        calculated_hive_tez_container_size = 2048
+      elif nm_mem_per_node_normalized <= 24576:
+        calculated_hive_tez_container_size = 3072
+      else:
+        calculated_hive_tez_container_size = 4096
+
+      Logger.info("DBG: Calculated and returning 'hive_tez_container_size' : {0}".format(calculated_hive_tez_container_size))
+      return calculated_hive_tez_container_size
+    else:
+      Logger.info("DBG: Returning 'hive_tez_container_size' : {0}".format(hive_tez_container_size))
+      return hive_tez_container_size
+
+  def get_hive_tez_container_size(self, services):
+    """
+    Gets HIVE Tez container size (hive.tez.container.size).
+    """
+    hive_container_size = None
+    hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+    if hsi_site and 'hive.tez.container.size' in hsi_site:
+      hive_container_size = hsi_site['hive.tez.container.size']
+
+    return hive_container_size
+
+  def get_llap_headroom_space(self, services, configurations):
+    """
+    Gets HIVE Server Interactive's 'llap_headroom_space' config. (Default value set to 6144 bytes).
+    """
+    llap_headroom_space = None
+    # Check if 'llap_headroom_space' is modified in current SA invocation.
+    if 'hive-interactive-env' in configurations and 'llap_headroom_space' in configurations['hive-interactive-env']['properties']:
+      hive_container_size = float(configurations['hive-interactive-env']['properties']['llap_headroom_space'])
+      Logger.info("'llap_headroom_space' read from configurations as : {0}".format(llap_headroom_space))
+
+    if llap_headroom_space is None:
+      # Check if 'llap_headroom_space' is input in services array.
+      if 'llap_headroom_space' in services['configurations']['hive-interactive-env']['properties']:
+        llap_headroom_space = float(services['configurations']['hive-interactive-env']['properties']['llap_headroom_space'])
+        Logger.info("'llap_headroom_space' read from services as : {0}".format(llap_headroom_space))
+    if not llap_headroom_space or llap_headroom_space < 1:
+      llap_headroom_space = 6144 # 6GB
+      Logger.info("Couldn't read 'llap_headroom_space' from services or configurations. Returing default value : 6144 bytes")
+
+    return llap_headroom_space
+
+  def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name, llap_queue_cap_perc):
+    """
+    Checks and (1). Creates 'llap' queue if only 'default' queue exist at leaf level and is consuming 100% capacity OR
+               (2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist
+                    at root level : 'default' and 'llap'.
+    """
+    Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.")
+    putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+    putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+    putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+    putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services)
+    leafQueueNames = None
+    hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+    capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+    if capacity_scheduler_properties:
+      leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+      cap_sched_config_keys = capacity_scheduler_properties.keys()
+
+      yarn_default_queue_capacity = -1
+      if 'yarn.scheduler.capacity.root.default.capacity' in cap_sched_config_keys:
+        yarn_default_queue_capacity = float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.default.capacity'))
+
+      # Get 'llap' queue state
+      currLlapQueueState = ''
+      if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in cap_sched_config_keys:
+        currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state')
+
+      # Get 'llap' queue capacity
+      currLlapQueueCap = -1
+      if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity' in cap_sched_config_keys:
+        currLlapQueueCap = int(float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity')))
+
+      updated_cap_sched_configs_str = ''
+
+      enabled_hive_int_in_changed_configs = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False)
+      """
+      We create OR "modify 'llap' queue 'state and/or capacity' " based on below conditions:
+       - if only 1 queue exists at root level and is 'default' queue and has 100% cap -> Create 'llap' queue,  OR
+       - if 2 queues exists at root level ('llap' and 'default') :
+           - Queue selected is 'llap' and state is STOPPED -> Modify 'llap' queue state to RUNNING, adjust capacity, OR
+           - Queue selected is 'llap', state is RUNNING and 'llap_queue_capacity' prop != 'llap' queue current running capacity ->
+              Modify 'llap' queue capacity to 'llap_queue_capacity'
+      """
+      if 'default' in leafQueueNames and \
+          ((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \
+               ((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \
+                    ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_queue_cap_perc)))):
+        adjusted_default_queue_cap = str(100 - llap_queue_cap_perc)
+
+        hive_user = '*'  # Open to all
+        if 'hive_user' in services['configurations']['hive-env']['properties']:
+          hive_user = services['configurations']['hive-env']['properties']['hive_user']
+
+        llap_queue_cap_perc = str(llap_queue_cap_perc)
+
+        # If capacity-scheduler configs are received as one concatenated string, we deposit the changed configs back as
+        # one concatenated string.
+        updated_cap_sched_configs_as_dict = False
+        if not received_as_key_value_pair:
+          for prop, val in capacity_scheduler_properties.items():
+            if llap_queue_name not in prop:
+              if prop == 'yarn.scheduler.capacity.root.queues':
+                updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+                                                + prop + "=default,llap\n"
+              elif prop == 'yarn.scheduler.capacity.root.default.capacity':
+                updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+                                                + prop + "=" + adjusted_default_queue_cap + "\n"
+              elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+                updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+                                                + prop + "=" + adjusted_default_queue_cap + "\n"
+              elif prop.startswith('yarn.') and '.llap.' not in prop:
+                updated_cap_sched_configs_str = updated_cap_sched_configs_str + prop + "=" + val + "\n"
+
+          # Now, append the 'llap' queue related properties
+          updated_cap_sched_configs_str += """yarn.scheduler.capacity.root.{0}.user-limit-factor=1
+yarn.scheduler.capacity.root.{0}.state=RUNNING
+yarn.scheduler.capacity.root.{0}.ordering-policy=fifo
+yarn.scheduler.capacity.root.{0}.minimum-user-limit-percent=100
+yarn.scheduler.capacity.root.{0}.maximum-capacity={1}
+yarn.scheduler.capacity.root.{0}.capacity={1}
+yarn.scheduler.capacity.root.{0}.acl_submit_applications={2}
+yarn.scheduler.capacity.root.{0}.acl_administer_queue={2}
+yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_queue_name, llap_queue_cap_perc, hive_user)
+
+          putCapSchedProperty("capacity-scheduler", updated_cap_sched_configs_str)
+          Logger.info("Updated 'capacity-scheduler' configs as one concatenated string.")
+        else:
+          # If capacity-scheduler configs are received as a  dictionary (generally 1st time), we deposit the changed
+          # values back as dictionary itself.
+          # Update existing configs in 'capacity-scheduler'.
+          for prop, val in capacity_scheduler_properties.items():
+            if llap_queue_name not in prop:
+              if prop == 'yarn.scheduler.capacity.root.queues':
+                putCapSchedProperty(prop, 'default,llap')
+              elif prop == 'yarn.scheduler.capacity.root.default.capacity':
+                putCapSchedProperty(prop, adjusted_default_queue_cap)
+              elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+                putCapSchedProperty(prop, adjusted_default_queue_cap)
+              elif prop.startswith('yarn.') and '.llap.' not in prop:
+                putCapSchedProperty(prop, val)
+
+          # Add new 'llap' queue related configs.
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".user-limit-factor", "1")
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".state", "RUNNING")
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy", "fifo")
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent", "100")
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_queue_cap_perc)
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_queue_cap_perc)
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications", hive_user)
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue", hive_user)
+          putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent", "1")
+
+          Logger.info("Updated 'capacity-scheduler' configs as a dictionary.")
+          updated_cap_sched_configs_as_dict = True
+
+        if updated_cap_sched_configs_str or updated_cap_sched_configs_as_dict:
+          if len(leafQueueNames) == 1: # 'llap' queue didn't exist before
+            Logger.info("Created YARN Queue : '{0}' with capacity : {1}%. Adjusted 'default' queue capacity to : {2}%" \
+                        .format(llap_queue_name, llap_queue_cap_perc, adjusted_default_queue_cap))
+          else: # Queue existed, only adjustments done.
+            Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_queue_cap_perc))
+            Logger.info("Adjusted 'default' queue capacity to : {0}%".format(adjusted_default_queue_cap))
+
+          # Update Hive 'hive.llap.daemon.queue.name' prop to use 'llap' queue.
+          putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
+          putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+          # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility.
+          self.setLlapDaemonQueuePropAttributes(services, configurations)
+      else:
+        Logger.debug("Not creating/adjusting {0} queue. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames)))
+    else:
+      Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.")
+
+  def checkAndStopLlapQueue(self, services, configurations, llap_queue_name):
+    """
+    Checks and sees (1). If only two leaf queues exist at root level, namely: 'default' and 'llap',
+                and (2). 'llap' is in RUNNING state.
+
+    If yes, performs the following actions:   (1). 'llap' queue state set to STOPPED,
+                                              (2). 'llap' queue capacity set to 0 %,
+                                              (3). 'default' queue capacity set to 100 %
+    """
+    putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services)
+    putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+    capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+    updated_default_queue_configs = ''
+    updated_llap_queue_configs = ''
+    if capacity_scheduler_properties:
+      # Get all leaf queues.
+      leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+
+      if len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames and 'default' in leafQueueNames:
+        # Get 'llap' queue state
+        currLlapQueueState = 'STOPPED'
+        if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in capacity_scheduler_properties.keys():
+          currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state')
+        else:
+          Logger.error("{0} queue 'state' property not present in capacity scheduler. Skipping adjusting queues.".format(llap_queue_name))
+          return
+        if currLlapQueueState == 'RUNNING':
+          DEFAULT_MAX_CAPACITY = '100'
+          for prop, val in capacity_scheduler_properties.items():
+            # Update 'default' related configs in 'updated_default_queue_configs'
+            if llap_queue_name not in prop:
+              if prop == 'yarn.scheduler.capacity.root.default.capacity':
+                # Set 'default' capacity back to maximum val
+                updated_default_queue_configs = updated_default_queue_configs \
+                                                + prop + "="+DEFAULT_MAX_CAPACITY + "\n"
+              elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+                # Set 'default' max. capacity back to maximum val
+                updated_default_queue_configs = updated_default_queue_configs \
+                                                + prop + "="+DEFAULT_MAX_CAPACITY + "\n"
+              elif prop.startswith('yarn.'):
+                updated_default_queue_configs = updated_default_queue_configs + prop + "=" + val + "\n"
+            else: # Update 'llap' related configs in 'updated_llap_queue_configs'
+              if prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state':
+                updated_llap_queue_configs = updated_llap_queue_configs \
+                                             + prop + "=STOPPED\n"
+              elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity':
+                updated_llap_queue_configs = updated_llap_queue_configs \
+                                             + prop + "=0\n"
+              elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.maximum-capacity':
+                updated_llap_queue_configs = updated_llap_queue_configs \
+                                             + prop + "=0\n"
+              elif prop.startswith('yarn.'):
+                updated_llap_queue_configs = updated_llap_queue_configs + prop + "=" + val + "\n"
+        else:
+          Logger.debug("{0} queue state is : {1}. Skipping adjusting queues.".format(llap_queue_name, currLlapQueueState))
+          return
+
+        if updated_default_queue_configs and updated_llap_queue_configs:
+          putCapSchedProperty("capacity-scheduler", updated_default_queue_configs+updated_llap_queue_configs)
+          Logger.info("Changed YARN '{0}' queue state to 'STOPPED', and capacity to 0%. Adjusted 'default' queue capacity to : {1}%" \
+                      .format(llap_queue_name, DEFAULT_MAX_CAPACITY))
+
+          # Update Hive 'hive.llap.daemon.queue.name' prop to use 'default' queue.
+          putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', YARNRecommender.YARN_ROOT_DEFAULT_QUEUE_NAME)
+          putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', YARNRecommender.YARN_ROOT_DEFAULT_QUEUE_NAME)
+      else:
+        Logger.debug("Not removing '{0}' queue as number of Queues not equal to 2. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames)))
+    else:
+      Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.")
+
+  def setLlapDaemonQueuePropAttributes(self, services, configurations):
+    """
+    Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes.  Takes into
+    account that 'capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation.
+    """
+    Logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.")
+    #TODO Determine if this is doing the right thing if some queue is setup with capacity=0, or is STOPPED. Maybe don't list it.
+    putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+    capacity_scheduler_properties = dict()
+
+    # Read 'capacity-scheduler' from configurations if we modified and added recommendation to it, as part of current
+    # StackAdvisor invocation.
+    if "capacity-scheduler" in configurations:
+      cap_sched_props_as_dict = configurations["capacity-scheduler"]["properties"]
+      if 'capacity-scheduler' in cap_sched_props_as_dict:
+        cap_sched_props_as_str = configurations['capacity-scheduler']['properties']['capacity-scheduler']
+        if cap_sched_props_as_str:
+          cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n')
+          if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null':
+            # Got 'capacity-scheduler' configs as one "\n" separated string
+            for property in cap_sched_props_as_str:
+              key, sep, value = property.partition("=")
+              capacity_scheduler_properties[key] = value
+            Logger.info("'capacity-scheduler' configs is set as a single '\\n' separated string in current invocation. "
+                        "count(configurations['capacity-scheduler']['properties']['capacity-scheduler']) = "
+                        "{0}".format(len(capacity_scheduler_properties)))
+          else:
+            Logger.info("Read configurations['capacity-scheduler']['properties']['capacity-scheduler'] is : {0}".format(cap_sched_props_as_str))
+        else:
+          Logger.info("configurations['capacity-scheduler']['properties']['capacity-scheduler'] : {0}.".format(cap_sched_props_as_str))
+
+      # if 'capacity_scheduler_properties' is empty, implies we may have 'capacity-scheduler' configs as dictionary
+      # in configurations, if 'capacity-scheduler' changed in current invocation.
+      if not capacity_scheduler_properties:
+        if isinstance(cap_sched_props_as_dict, dict) and len(cap_sched_props_as_dict) > 1:
+          capacity_scheduler_properties = cap_sched_props_as_dict
+          Logger.info("'capacity-scheduler' changed in current Stack Advisor invocation. Retrieved the configs as dictionary from configurations.")
+        else:
+          Logger.info("Read configurations['capacity-scheduler']['properties'] is : {0}".format(cap_sched_props_as_dict))
+    else:
+      Logger.info("'capacity-scheduler' not modified in the current Stack Advisor invocation.")
+
+
+    # if 'capacity_scheduler_properties' is still empty, implies 'capacity_scheduler' wasn't change in current
+    # SA invocation. Thus, read it from input : 'services'.
+    if not capacity_scheduler_properties:
+      capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+      Logger.info("'capacity-scheduler' not changed in current Stack Advisor invocation. Retrieved the configs from services.")
+
+    # Get set of current YARN leaf queues.
+    leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+    if leafQueueNames:
+      leafQueues = [{"label": str(queueName), "value": queueName} for queueName in leafQueueNames]
+      leafQueues = sorted(leafQueues, key=lambda q: q['value'])
+      putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues)
+      Logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues))
+    else:
+      Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve "
+                   "'hive.server2.tez.default.queues' property attributes.")
+
+  #TODO  Convert this to a helper. It can apply to any property. Check config, or check if in the list of changed configurations and read the latest value
+  def get_yarn_min_container_size(self, services, configurations):
+    """
+    Gets YARN's minimum container size (yarn.scheduler.minimum-allocation-mb).
+    Reads from:
+      - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
+        is empty, else
+      - services['configurations'] (input).
+
+    services["changed-configurations"] would be empty if Stack Advisor call is made from Blueprints (1st invocation). Subsequent
+    Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advisor calculates this
+    value (configurations), it is finally not recommended, making 'input' value to survive.
+
+    :type services dict
+    :type configurations dict
+    :rtype str
+    """
+    yarn_min_container_size = None
+    yarn_min_allocation_property = "yarn.scheduler.minimum-allocation-mb"
+    yarn_site = self.getSiteProperties(configurations, "yarn-site")
+    yarn_site_properties = self.getServicesSiteProperties(services, "yarn-site")
+
+    # Check if services["changed-configurations"] is empty and 'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation.
+    if not services["changed-configurations"] and yarn_site and yarn_min_allocation_property in yarn_site:
+      yarn_min_container_size = yarn_site[yarn_min_allocation_property]
+      Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from configurations as : {0}".format(yarn_min_container_size))
+
+    # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services array.
+    elif yarn_site_properties and yarn_min_allocation_property in yarn_site_properties:
+      yarn_min_container_size = yarn_site_properties[yarn_min_allocation_property]
+      Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size))
+
+    if not yarn_min_container_size:
+      Logger.error("{0} was not found in the configuration".format(yarn_min_allocation_property))
+
+    return yarn_min_container_size
+
+  def calculate_slider_am_size(self, yarn_min_container_size):
+    """
+    Calculates the Slider App Master size based on YARN's Minimum Container Size.
+
+    :type yarn_min_container_size int
+    """
+    if yarn_min_container_size > 1024:
+      return 1024
+    if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024:
+      return yarn_min_container_size
+    if yarn_min_container_size < 256:
+      return 256
+
+  def calculate_slider_am_size(self, yarn_min_container_size):
+    """
+    Calculates the Slider App Master size based on YARN's Minimum Container Size.
+
+    :type yarn_min_container_size int
+    """
+    if yarn_min_container_size > 1024:
+      return 1024
+    if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024:
+      return yarn_min_container_size
+    if yarn_min_container_size < 256:
+      return 256
+
+  def get_yarn_nm_mem_in_mb(self, services, configurations):
+    """
+    Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb).
+    Reads from:
+      - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
+        is empty, else
+      - services['configurations'] (input).
+
+    services["changed-configurations"] would be empty is Stack Advisor call if made from Blueprints (1st invocation). Subsequent
+    Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this
+    value (configurations), it is finally not recommended, making 'input' value to survive.
+    """
+    yarn_nm_mem_in_mb = None
+
+    yarn_site = self.getServicesSiteProperties(services, "yarn-site")
+    yarn_site_properties = self.getSiteProperties(configurations, "yarn-site")
+
+    # Check if services["changed-configurations"] is empty and 'yarn.nodemanager.resource.memory-mb' is modified in current ST invocation.
+    if not services["changed-configurations"] and yarn_site_properties and 'yarn.nodemanager.resource.memory-mb' in yarn_site_properties:
+      yarn_nm_mem_in_mb = float(yarn_site_properties['yarn.nodemanager.resource.memory-mb'])
+    elif yarn_site and 'yarn.nodemanager.resource.memory-mb' in yarn_site:
+      # Check if 'yarn.nodemanager.resource.memory-mb' is input in services array.
+      yarn_nm_mem_in_mb = float(yarn_site['yarn.nodemanager.resource.memory-mb'])
+
+    if yarn_nm_mem_in_mb <= 0.0:
+      Logger.warning("'yarn.nodemanager.resource.memory-mb' current value : {0}. Expected value : > 0".format(yarn_nm_mem_in_mb))
+
+    return yarn_nm_mem_in_mb
+
+  def calculate_tez_am_container_size(self, services, total_cluster_capacity):
+    """
+    Calculates Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site on initialization if values read is 0.
+    Else returns the read value.
+    """
+    tez_am_resource_memory_mb = self.get_tez_am_resource_memory_mb(services)
+    calculated_tez_am_resource_memory_mb = None
+    if tez_am_resource_memory_mb == YARNRecommender.CONFIG_VALUE_UINITIALIZED:
+      if total_cluster_capacity <= 4096:
+        calculated_tez_am_resource_memory_mb = 256
+      elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728:
+        calculated_tez_am_resource_memory_mb = 512
+      elif total_cluster_capacity > 73728:
+        calculated_tez_am_resource_memory_mb = 1536
+
+      Logger.info("DBG: Calculated and returning 'tez_am_resource_memory_mb' as : {0}".format(calculated_tez_am_resource_memory_mb))
+      return float(calculated_tez_am_resource_memory_mb)
+    else:
+      Logger.info("DBG: Returning 'tez_am_resource_memory_mb' as : {0}".format(tez_am_resource_memory_mb))
+      return float(tez_am_resource_memory_mb)
+
+  def get_tez_am_resource_memory_mb(self, services):
+    """
+    Gets Tez's AM resource memory (tez.am.resource.memory.mb) from services.
+    """
+    tez_am_resource_memory_mb = None
+    if 'tez.am.resource.memory.mb' in services['configurations']['tez-interactive-site']['properties']:
+      tez_am_resource_memory_mb = services['configurations']['tez-interactive-site']['properties']['tez.am.resource.memory.mb']
+
+    return tez_am_resource_memory_mb
+
+  def min_queue_perc_reqd_for_llap_and_hive_app(self, services, hosts, configurations):
+    """
+    Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state.
+    """
+    # Get queue size if sized at 20%
+    node_manager_hosts = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+    yarn_rm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+    total_cluster_cap = len(node_manager_hosts) * yarn_rm_mem_in_mb
+    total_queue_size_at_20_perc = 20.0 / 100 * total_cluster_cap
+
+    # Calculate based on minimum size required by containers.
+    yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
+    slider_am_size = self.calculate_slider_am_size(float(yarn_min_container_size))
+    hive_tez_container_size = long(self.get_hive_tez_container_size(services))
+    tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
+    normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) \
+                     + self._normalizeUp(hive_tez_container_size, yarn_min_container_size) \
+                     + self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+
+    min_required = max(total_queue_size_at_20_perc, normalized_val)
+    min_required_perc = min_required * 100 / total_cluster_cap
+
+    return int(ceil(min_required_perc))
+
+  def _normalizeDown(self, val1, val2):
+    """
+    Normalize down 'val2' with respect to 'val1'.
+    """
+

<TRUNCATED>