You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/27 18:17:27 UTC
[16/49] ambari git commit: AMBARI-19098. HDP 3.0 TP - create Service
Advisor for YARN/MR (alejandro)
http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
new file mode 100644
index 0000000..cd35b68
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/YARN/3.0.0.3.0/service_advisor.py
@@ -0,0 +1,1789 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+import inspect
+import socket
+from math import floor, ceil
+
+# Local imports
+from resource_management.core.logger import Logger
+
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+ with open(PARENT_FILE, 'rb') as fp:
+ service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+ traceback.print_exc()
+ print "Failed to load parent"
+
+
+class YARNServiceAdvisor(service_advisor.ServiceAdvisor):
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(YARNServiceAdvisor, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ # Always call these methods
+ self.modifyMastersWithMultipleInstances()
+ self.modifyCardinalitiesDict()
+ self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
+
+ def modifyMastersWithMultipleInstances(self):
+ """
+ Modify the set of masters with multiple instances.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyCardinalitiesDict(self):
+ """
+ Modify the dictionary of cardinalities.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyHeapSizeProperties(self):
+ """
+ Modify the dictionary of heap size properties.
+ Must be overriden in child class.
+ """
+ self.heap_size_properties = {}
+
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ self.notValuableComponents.add("APP_TIMELINE_SERVER")
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+ })
+
+ def getServiceComponentLayoutValidations(self, services, hosts):
+ """
+ Get a list of errors.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+ def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+ """
+ Entry point.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ # Due to the existing stack inheritance, make it clear where each calculation came from.
+ recommender = YARNRecommender()
+
+ # YARN
+ recommender.recommendYARNConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+ recommender.recommendYARNConfigurationsFromHDP22(configurations, clusterData, services, hosts)
+ recommender.recommendYARNConfigurationsFromHDP23(configurations, clusterData, services, hosts)
+ recommender.recommendYARNConfigurationsFromHDP25(configurations, clusterData, services, hosts)
+
+ def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+ """
+ Entry point.
+ Validate configurations for the service. Return a list of errors.
+ The code for this function should be the same for each Service Advisor.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ validator = YARNValidator()
+ # Calls the methods of the validator using arguments,
+ # method(siteProperties, siteRecommendations, configurations, services, hosts)
+ return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+class MAPREDUCE2ServiceAdvisor(service_advisor.ServiceAdvisor):
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(MAPREDUCE2ServiceAdvisor, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ # Always call these methods
+ self.modifyMastersWithMultipleInstances()
+ self.modifyCardinalitiesDict()
+ self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
+
+ def modifyMastersWithMultipleInstances(self):
+ """
+ Modify the set of masters with multiple instances.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyCardinalitiesDict(self):
+ """
+ Modify the dictionary of cardinalities.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyHeapSizeProperties(self):
+ """
+ Modify the dictionary of heap size properties.
+ Must be overriden in child class.
+ """
+ self.heap_size_properties = {}
+
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({
+ 'HISTORYSERVER': {31: 1, "else": 2},
+ })
+
+ def getServiceComponentLayoutValidations(self, services, hosts):
+ """
+ Get a list of errors.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Service Component Layout." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ return self.as_super.getServiceComponentLayoutValidations(services, hosts)
+
+ def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+ """
+ Entry point.
+ Must be overriden in child class.
+ """
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ # Due to the existing stack inheritance, make it clear where each calculation came from.
+ recommender = MAPREDUCE2Recommender()
+ recommender.recommendMapReduce2ConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+ recommender.recommendMapReduce2ConfigurationsFromHDP22(configurations, clusterData, services, hosts)
+
+ def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+ """
+ Entry point.
+ Validate configurations for the service. Return a list of errors.
+ The code for this function should be the same for each Service Advisor.
+ """
+ Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ validator = YARNValidator()
+ # Calls the methods of the validator using arguments,
+ # method(siteProperties, siteRecommendations, configurations, services, hosts)
+ return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+class YARNRecommender(service_advisor.ServiceAdvisor):
+ """
+ YARN Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+ """
+
+ HIVE_INTERACTIVE_SITE = 'hive-interactive-site'
+ YARN_ROOT_DEFAULT_QUEUE_NAME = 'default'
+ CONFIG_VALUE_UINITIALIZED = 'SET_ON_FIRST_INVOCATION'
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(YARNRecommender, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ def recommendYARNConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
+ """
+ Recommend configurations for this service based on HDP 2.0.6.
+ """
+ Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ (self.__class__.__name__, inspect.stack()[0][3]))
+
+ putYarnProperty = self.putProperty(configurations, "yarn-site", services)
+ putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site")
+ putYarnEnvProperty = self.putProperty(configurations, "yarn-env", services)
+ nodemanagerMinRam = 1048576 # 1TB in mb
+ if "referenceNodeManagerHost" in clusterData:
+ nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam)
+
+ callContext = self.getCallContext(services)
+ putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+ # read from the supplied config
+ #if 'recommendConfigurations' != callContext and \
+ # "yarn-site" in services["configurations"] and \
+ # "yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]:
+ # putYarnProperty('yarn.nodemanager.resource.memory-mb', int(services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+ if 'recommendConfigurations' == callContext:
+ putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+ else:
+ # read from the supplied config
+ if "yarn-site" in services["configurations"] and "yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]:
+ putYarnProperty('yarn.nodemanager.resource.memory-mb', int(services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+ else:
+ putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
+ pass
+ pass
+
+ putYarnProperty('yarn.scheduler.minimum-allocation-mb', int(clusterData['yarnMinContainerSize']))
+ putYarnProperty('yarn.scheduler.maximum-allocation-mb', int(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]))
+ putYarnEnvProperty('min_user_id', self.get_system_min_uid())
+
+ yarn_mount_properties = [
+ ("yarn.nodemanager.local-dirs", "NODEMANAGER", "/hadoop/yarn/local", "multi"),
+ ("yarn.nodemanager.log-dirs", "NODEMANAGER", "/hadoop/yarn/log", "multi"),
+ ("yarn.timeline-service.leveldb-timeline-store.path", "APP_TIMELINE_SERVER", "/hadoop/yarn/timeline", "single"),
+ ("yarn.timeline-service.leveldb-state-store.path", "APP_TIMELINE_SERVER", "/hadoop/yarn/timeline", "single")
+ ]
+
+ self.updateMountProperties("yarn-site", yarn_mount_properties, configurations, services, hosts)
+
+ sc_queue_name = self.recommendYarnQueue(services, "yarn-env", "service_check.queue.name")
+ if sc_queue_name is not None:
+ putYarnEnvProperty("service_check.queue.name", sc_queue_name)
+
+ containerExecutorGroup = 'hadoop'
+ if 'cluster-env' in services['configurations'] and 'user_group' in services['configurations']['cluster-env']['properties']:
+ containerExecutorGroup = services['configurations']['cluster-env']['properties']['user_group']
+ putYarnProperty("yarn.nodemanager.linux-container-executor.group", containerExecutorGroup)
+
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ if "TEZ" in servicesList:
+ ambari_user = self.getAmbariUser(services)
+ ambariHostName = socket.getfqdn()
+ putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(ambari_user), ambariHostName)
+ putYarnProperty("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(ambari_user), "*")
+ old_ambari_user = self.getOldAmbariUser(services)
+ if old_ambari_user is not None:
+ putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+ putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+ def recommendYARNConfigurationsFromHDP22(self, configurations, clusterData, services, hosts):
+ putYarnProperty = self.putProperty(configurations, "yarn-site", services)
+ putYarnProperty('yarn.nodemanager.resource.cpu-vcores', clusterData['cpu'])
+ putYarnProperty('yarn.scheduler.minimum-allocation-vcores', 1)
+ putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+ # Property Attributes
+ putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site")
+ nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts)
+ if (nodeManagerHost is not None):
+ cpuPercentageLimit = 0.8
+ if "yarn.nodemanager.resource.percentage-physical-cpu-limit" in configurations["yarn-site"]["properties"]:
+ cpuPercentageLimit = float(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.percentage-physical-cpu-limit"])
+ cpuLimit = max(1, int(floor(nodeManagerHost["Hosts"]["cpu_count"] * cpuPercentageLimit)))
+ putYarnProperty('yarn.nodemanager.resource.cpu-vcores', str(cpuLimit))
+ putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+ putYarnPropertyAttribute('yarn.nodemanager.resource.memory-mb', 'maximum', int(nodeManagerHost["Hosts"]["total_mem"] / 1024)) # total_mem in kb
+ putYarnPropertyAttribute('yarn.nodemanager.resource.cpu-vcores', 'maximum', nodeManagerHost["Hosts"]["cpu_count"] * 2)
+ putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+ putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"])
+ putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
+ putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"])
+
+ kerberos_authentication_enabled = self.isSecurityEnabled(services)
+ if kerberos_authentication_enabled:
+ putYarnProperty('yarn.nodemanager.container-executor.class',
+ 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor')
+
+ if "yarn-env" in services["configurations"] and "yarn_cgroups_enabled" in services["configurations"]["yarn-env"]["properties"]:
+ yarn_cgroups_enabled = services["configurations"]["yarn-env"]["properties"]["yarn_cgroups_enabled"].lower() == "true"
+ if yarn_cgroups_enabled:
+ putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor')
+ putYarnProperty('yarn.nodemanager.linux-container-executor.group', 'hadoop')
+ putYarnProperty('yarn.nodemanager.linux-container-executor.resources-handler.class', 'org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler')
+ putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', '/yarn')
+ putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount', 'true')
+ putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount-path', '/cgroup')
+ else:
+ if not kerberos_authentication_enabled:
+ putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor')
+ putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.resources-handler.class', 'delete', 'true')
+ putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', 'delete', 'true')
+ putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount', 'delete', 'true')
+ putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount-path', 'delete', 'true')
+ # recommend hadoop.registry.rm.enabled based on SLIDER in services
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ if "SLIDER" in servicesList:
+ putYarnProperty('hadoop.registry.rm.enabled', 'true')
+ else:
+ putYarnProperty('hadoop.registry.rm.enabled', 'false')
+
+ def recommendYARNConfigurationsFromHDP23(self, configurations, clusterData, services, hosts):
+ putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services)
+ putYarnSitePropertyAttributes = self.putPropertyAttribute(configurations, "yarn-site")
+
+ if "tez-site" not in services["configurations"]:
+ putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', '')
+ else:
+ putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', 'org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl')
+
+ if "ranger-env" in services["configurations"] and "ranger-yarn-plugin-properties" in services["configurations"] and \
+ "ranger-yarn-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
+ putYarnRangerPluginProperty = self.putProperty(configurations, "ranger-yarn-plugin-properties", services)
+ rangerEnvYarnPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-yarn-plugin-enabled"]
+ putYarnRangerPluginProperty("ranger-yarn-plugin-enabled", rangerEnvYarnPluginProperty)
+ rangerPluginEnabled = ''
+ if 'ranger-yarn-plugin-properties' in configurations and 'ranger-yarn-plugin-enabled' in configurations['ranger-yarn-plugin-properties']['properties']:
+ rangerPluginEnabled = configurations['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled']
+ elif 'ranger-yarn-plugin-properties' in services['configurations'] and 'ranger-yarn-plugin-enabled' in services['configurations']['ranger-yarn-plugin-properties']['properties']:
+ rangerPluginEnabled = services['configurations']['ranger-yarn-plugin-properties']['properties']['ranger-yarn-plugin-enabled']
+
+ if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()):
+ putYarnSiteProperty('yarn.acl.enable','true')
+ putYarnSiteProperty('yarn.authorization-provider','org.apache.ranger.authorization.yarn.authorizer.RangerYarnAuthorizer')
+ else:
+ putYarnSitePropertyAttributes('yarn.authorization-provider', 'delete', 'true')
+
+ def recommendYARNConfigurationsFromHDP25(self, configurations, clusterData, services, hosts):
+ hsi_env_poperties = self.getServicesSiteProperties(services, "hive-interactive-env")
+ cluster_env = self.getServicesSiteProperties(services, "cluster-env")
+
+ # Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP)
+ if hsi_env_poperties and 'enable_hive_interactive' in hsi_env_poperties:
+ enable_hive_interactive = hsi_env_poperties['enable_hive_interactive']
+ LLAP_QUEUE_NAME = 'llap'
+
+ # Hive Server interactive is already added or getting added
+ if enable_hive_interactive == 'true':
+ self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME)
+ else: # When Hive Interactive Server is in 'off/removed' state.
+ self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME)
+
+ putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services)
+ stack_root = "/usr/hdp"
+ if cluster_env and "stack_root" in cluster_env:
+ stack_root = cluster_env["stack_root"]
+
+ timeline_plugin_classes_values = []
+ timeline_plugin_classpath_values = []
+
+ if self.isServiceDeployed(services, "TEZ"):
+ timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl')
+
+ if self.isServiceDeployed(services, "SPARK"):
+ timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin')
+ timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*")
+
+ putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values))
+ putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values))
+
+ #region LLAP
+ def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
+ """
+ Entry point for updating Hive's 'LLAP app' configs namely :
+ (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
+ (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
+ (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size
+ (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and
+ (13). hive.llap.io.enabled.
+
+ The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
+ (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
+ (4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
+
+ If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
+ value is not calulated, but read and use in calculation for dependent configs.
+
+ Note: All memory calculations are in MB, unless specified otherwise.
+ """
+ Logger.info("DBG: Entered updateLlapConfigs");
+ putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+ putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+ putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+ putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+ putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services)
+ llap_daemon_selected_queue_name = None
+ selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed.
+ llap_selected_queue_am_percent = None
+ DEFAULT_EXECUTOR_TO_AM_RATIO = 20
+ MIN_EXECUTOR_TO_AM_RATIO = 10
+ MAX_CONCURRENT_QUERIES = 32
+ leafQueueNames = None
+ MB_TO_BYTES = 1048576
+ hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+ yarn_site = self.getServicesSiteProperties(services, "yarn-site")
+
+ # Update 'hive.llap.daemon.queue.name' prop combo entries
+ self.setLlapDaemonQueuePropAttributes(services, configurations)
+
+ if not services["changed-configurations"]:
+ read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations))
+ putHiveInteractiveSiteProperty("hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb)
+
+ if hsi_site and "hive.llap.daemon.queue.name" in hsi_site:
+ llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"]
+
+ # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'.
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ if capacity_scheduler_properties:
+ # Get all leaf queues.
+ leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+ Logger.info("YARN leaf Queues = {0}".format(leafQueueNames))
+ if len(leafQueueNames) == 0:
+ Logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.")
+ return
+
+ # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
+ changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False)
+ llap_named_queue_selected_in_curr_invocation = None
+ if changed_configs_has_enable_hive_int \
+ and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
+ if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames):
+ llap_named_queue_selected_in_curr_invocation = True
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+ else:
+ first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected.
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue)
+ llap_named_queue_selected_in_curr_invocation = False
+ Logger.info("DBG: llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation))
+
+ if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and llap_daemon_selected_queue_name == llap_queue_name) or
+ llap_named_queue_selected_in_curr_invocation) or \
+ (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation):
+ Logger.info("Setting visibility of num_llap_nodes to true.")
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true")
+ selected_queue_is_ambari_managed_llap = True
+ Logger.info("DBG: Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+ "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames)))
+ else:
+ Logger.info("Setting visibility of num_llap_nodes to false.")
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
+ Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+ "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames)))
+ selected_queue_is_ambari_managed_llap = False
+
+ if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
+ # state information pertaining to 'llap' queue.
+ # Check: State of the selected queue should not be STOPPED.
+ if llap_daemon_selected_queue_name:
+ llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
+ if llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED":
+ Logger.error("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
+ "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ else:
+ Logger.error("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
+ .format(llap_daemon_selected_queue_name))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ else:
+ Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
+ " Not calculating LLAP configs.")
+ return
+
+ changed_configs_in_hive_int_env = None
+ llap_concurrency_in_changed_configs = None
+ llap_daemon_queue_in_changed_configs = None
+ # Calculations are triggered only if there is change in any one of the following props :
+ # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
+ # or 'hive.llap.daemon.queue.name' has change in value selection.
+ # OR
+ # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
+ if 'changed-configurations' in services.keys():
+ config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive'])
+ changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(services, "hive-interactive-env", config_names_to_be_checked, False)
+
+ # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
+ llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(services, YARNRecommender.HIVE_INTERACTIVE_SITE, ['hive.server2.tez.sessions.per.default.queue'], False)
+ llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(services, YARNRecommender.HIVE_INTERACTIVE_SITE, ['hive.llap.daemon.queue.name'], False)
+
+ if not changed_configs_in_hive_int_env and not llap_concurrency_in_changed_configs and \
+ not llap_daemon_queue_in_changed_configs and services["changed-configurations"]:
+ Logger.info("DBG: LLAP parameters not modified. Not adjusting LLAP configs.")
+ Logger.info("DBG: Current 'changed-configuration' received is : {0}".format(services["changed-configurations"]))
+ return
+
+ Logger.info("\nDBG: Performing LLAP config calculations ......")
+ node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+ node_manager_cnt = len(node_manager_host_list)
+ yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+ total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
+ Logger.info("DBG: Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
+ "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb))
+ yarn_min_container_size = float(self.get_yarn_min_container_size(services, configurations))
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
+ normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+
+ if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site:
+ cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"])
+ else:
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
+ "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size,
+ total_cluster_capacity))
+
+ # Calculate the available memory for LLAP app
+ yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
+ mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)
+ Logger.info("DBG: Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
+ "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host))
+
+
+ if mem_per_thread_for_llap is None:
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ mem_per_thread_for_llap = float(mem_per_thread_for_llap)
+
+ Logger.info("DBG: selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap))
+ if not selected_queue_is_ambari_managed_llap:
+ llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity)
+
+ if llap_daemon_selected_queue_cap <= 0:
+ Logger.warning("'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format(
+ llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size)
+ Logger.info("DBG: Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
+ "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized,
+ llap_daemon_selected_queue_cap, yarn_min_container_size))
+ '''Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulkations will take care of cutting this down if required'''
+ num_llap_nodes_requested = ceil(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized)
+ Logger.info("DBG: Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
+ "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized))
+ queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name))
+ hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
+ Logger.info("DBG: Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
+ "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized))
+ else: # Ambari managed 'llap' named queue at root level.
+ num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input
+ total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
+ Logger.info("DBG: Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
+ "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized))
+ total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size))
+ Logger.info("DBG: Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
+ "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size))
+
+ # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
+ llap_named_queue_cap_fraction = ceil(total_llap_mem_normalized / total_cluster_capacity * 100)
+ Logger.info("DBG: Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction))
+
+ if llap_named_queue_cap_fraction > 100:
+ Logger.warning("Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ # Adjust capacity scheduler for the 'llap' named queue.
+ self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction)
+ hive_tez_am_cap_available = total_llap_mem_normalized
+ Logger.info("DBG: hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available))
+
+ # Common calculations now, irrespective of the queue selected.
+
+ # Get calculated value for Slider AM container Size
+ slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
+ yarn_min_container_size)
+ Logger.info("DBG: Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : "
+ "{1}".format(slider_am_container_size, yarn_min_container_size))
+
+ llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size
+ Logger.info("DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
+ "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size))
+
+ if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size:
+ Logger.warning("Not enough capacity available on the cluster to run LLAP")
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ # Calculate llap concurrency (i.e. Number of Tez AM's)
+ max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+
+ # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
+ if not llap_concurrency_in_changed_configs:
+ if max_executors_per_node <= 0:
+ Logger.warning("Calculated 'max_executors_per_node' = {0}. Expected value >= 1.".format(max_executors_per_node))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ Logger.info("DBG: Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+ "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+
+ # Default 1 AM for every 20 executor threads.
+ # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
+ # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
+ # instead limited by #CPUs. Use maxPerNode to factor this in.
+ llap_concurreny_limit = min(floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+ Logger.info("DBG: Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
+ ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES))
+ llap_concurrency = min(llap_concurreny_limit, floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size)))
+ Logger.info("DBG: Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+ "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+ "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO,
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
+ if llap_concurrency == 0:
+ llap_concurrency = 1
+
+ if llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available:
+ llap_concurrency = floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+
+ if llap_concurrency <= 0:
+ Logger.warning("Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+ "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size))
+ else:
+ # Read current value
+ if 'hive.server2.tez.sessions.per.default.queue' in hsi_site:
+ llap_concurrency = long(hsi_site['hive.server2.tez.sessions.per.default.queue'])
+ if llap_concurrency <= 0:
+ Logger.warning("'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1".format(llap_concurrency))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Read 'llap_concurrency' : {0}".format(llap_concurrency ))
+ else:
+ llap_concurrency = 1
+ Logger.warning("Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1.")
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
+ max_llap_concurreny_limit = min(floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+ Logger.info("DBG: Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
+ ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node,
+ num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO,
+ MAX_CONCURRENT_QUERIES))
+ max_llap_concurreny = min(max_llap_concurreny_limit, floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO *
+ mem_per_thread_for_llap + normalized_tez_am_container_size)))
+ Logger.info("DBG: Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+ "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+ "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO,
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
+ if max_llap_concurreny == 0:
+ max_llap_concurreny = 1
+ Logger.info("DBG: Adjusted 'max_llap_concurreny' : 1.")
+
+ if (max_llap_concurreny * normalized_tez_am_container_size) > hive_tez_am_cap_available:
+ max_llap_concurreny = floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+ if max_llap_concurreny <= 0:
+ Logger.warning("Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+ "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size))
+
+ # Calculate value for 'num_llap_nodes', an across cluster config.
+ tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
+ Logger.info("DBG: Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
+ "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size))
+ llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
+
+ if llap_mem_daemon_size < yarn_min_container_size:
+ Logger.warning("Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format(
+ llap_mem_daemon_size, yarn_min_container_size))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+
+ if llap_mem_daemon_size < mem_per_thread_for_llap or llap_mem_daemon_size < yarn_min_container_size:
+ Logger.warning("Not enough memory available for executors.")
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
+ "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required))
+
+ llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size)
+ Logger.info("DBG: Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
+ "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size))
+ if llap_daemon_mem_per_node == 0:
+ # Small cluster. No capacity left on a node after running AMs.
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
+ num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+ Logger.info("DBG: 'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
+ "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap))
+ elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
+ # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
+ num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+ Logger.info("DBG: 'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
+ ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node))
+ else:
+ # All good. We have a proper value for memoryPerNode.
+ num_llap_nodes = num_llap_nodes_requested
+ Logger.info("DBG: num_llap_nodes : {0}".format(num_llap_nodes))
+
+ num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+ if num_executors_per_node_max < 1:
+ Logger.warning("Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+
+ # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
+ num_executors_per_node = min(floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max)
+ if num_executors_per_node <= 0:
+ Logger.warning("Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node))
+ self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ return
+ Logger.info("DBG: Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap))
+
+ # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
+ total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
+ cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
+
+ tez_runtime_io_sort_mb = (long((0.8 * mem_per_thread_for_llap) / 3))
+ tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap)
+ # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
+ hive_auto_convert_join_noconditionaltask_size = (long((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES
+
+ # Calculate value for prop 'llap_heap_size'
+ llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations))
+ Logger.info("DBG: Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node))
+
+ # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
+ hive_server_interactive_heapsize = None
+ hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
+ if hive_server_interactive_hosts is None:
+ # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
+ hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
+ if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0:
+ host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
+ hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem)
+ Logger.info("DBG: Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
+ "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem))
+
+ # Done with calculations, updating calculated configs.
+ Logger.info("DBG: Applying the calculated values....")
+
+ normalized_tez_am_container_size = long(normalized_tez_am_container_size)
+ putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size)
+
+ if not llap_concurrency_in_changed_configs:
+ min_llap_concurrency = 1
+ putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum",
+ min_llap_concurrency)
+
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny)
+
+ num_llap_nodes = long(num_llap_nodes)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+ #TODO A single value is not being set for numNodes in case of a custom queue. Also the attribute is set to non-visible, so the UI likely ends up using an old cached value
+ if (num_llap_nodes != num_llap_nodes_requested):
+ Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes))
+ else:
+ Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested))
+
+ llap_container_size = long(llap_daemon_mem_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size)
+
+ # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
+ # Else, we don't (1). Override the previous calculated value or (2). User provided value.
+ if self.get_hive_tez_container_size(services) == YARNRecommender.CONFIG_VALUE_UINITIALIZED:
+ mem_per_thread_for_llap = long(mem_per_thread_for_llap)
+ putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap)
+
+ putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb)
+ if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]:
+ if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY":
+ putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800)
+
+ putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size)
+ putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size)
+
+ num_executors_per_node = long(num_executors_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", float(num_executors_per_node_max))
+
+ # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
+ # 'hive.llap.daemon.num.executors' at all times.
+ cache_mem_per_node = long(cache_mem_per_node)
+
+ putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node)
+
+ if hive_server_interactive_heapsize is not None:
+ putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize))
+
+ llap_io_enabled = 'true' if long(cache_mem_per_node) >= 64 else 'false'
+ putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled)
+
+ putHiveInteractiveEnvProperty('llap_heap_size', long(llap_xmx))
+ putHiveInteractiveEnvProperty('slider_am_container_mb', long(slider_am_container_size))
+ Logger.info("DBG: Done putting all configs")
+
+ def recommendDefaultLlapConfiguration(self, configurations, services, hosts):
+ Logger.info("DBG: Something likely went wrong. recommendDefaultLlapConfiguration")
+ putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+ putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+ putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+ putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+
+ yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
+ slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size))
+
+ node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+ node_manager_cnt = len(node_manager_host_list)
+
+ putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1)
+ putHiveInteractiveEnvProperty('num_llap_nodes', 0)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
+ putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0)
+ putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0)
+ putHiveInteractiveEnvProperty('llap_heap_size', 0)
+ putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size)
+
+ def get_num_llap_nodes(self, services, configurations):
+ """
+ Returns current value of number of LLAP nodes in cluster (num_llap_nodes)
+
+ :type services: dict
+ :type configurations: dict
+ :rtype int
+ """
+ hsi_env = self.getServicesSiteProperties(services, "hive-interactive-env")
+ hsi_env_properties = self.getSiteProperties(configurations, "hive-interactive-env")
+ num_llap_nodes = 0
+
+ # Check if 'num_llap_nodes' is modified in current ST invocation.
+ if hsi_env_properties and 'num_llap_nodes' in hsi_env_properties:
+ num_llap_nodes = hsi_env_properties['num_llap_nodes']
+ elif hsi_env and 'num_llap_nodes' in hsi_env:
+ num_llap_nodes = hsi_env['num_llap_nodes']
+ else:
+ Logger.error("Couldn't retrieve Hive Server 'num_llap_nodes' config. Setting value to {0}".format(num_llap_nodes))
+
+ return float(num_llap_nodes)
+
+ def get_max_executors_per_node(self, nm_mem_per_node_normalized, nm_cpus_per_node, mem_per_thread):
+ # TODO: This potentially takes up the entire node leaving no space for AMs.
+ return min(floor(nm_mem_per_node_normalized / mem_per_thread), nm_cpus_per_node)
+
+ def calculate_mem_per_thread_for_llap(self, services, nm_mem_per_node_normalized, cpu_per_nm_host):
+ """
+ Calculates 'mem_per_thread_for_llap' for 1st time initialization. Else returns 'hive.tez.container.size' read value.
+ """
+ hive_tez_container_size = self.get_hive_tez_container_size(services)
+
+ if hive_tez_container_size == self.CONFIG_VALUE_UINITIALIZED:
+ if nm_mem_per_node_normalized <= 1024:
+ calculated_hive_tez_container_size = min(512, nm_mem_per_node_normalized)
+ elif nm_mem_per_node_normalized <= 4096:
+ calculated_hive_tez_container_size = 1024
+ elif nm_mem_per_node_normalized <= 10240:
+ calculated_hive_tez_container_size = 2048
+ elif nm_mem_per_node_normalized <= 24576:
+ calculated_hive_tez_container_size = 3072
+ else:
+ calculated_hive_tez_container_size = 4096
+
+ Logger.info("DBG: Calculated and returning 'hive_tez_container_size' : {0}".format(calculated_hive_tez_container_size))
+ return calculated_hive_tez_container_size
+ else:
+ Logger.info("DBG: Returning 'hive_tez_container_size' : {0}".format(hive_tez_container_size))
+ return hive_tez_container_size
+
+ def get_hive_tez_container_size(self, services):
+ """
+ Gets HIVE Tez container size (hive.tez.container.size).
+ """
+ hive_container_size = None
+ hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+ if hsi_site and 'hive.tez.container.size' in hsi_site:
+ hive_container_size = hsi_site['hive.tez.container.size']
+
+ return hive_container_size
+
+ def get_llap_headroom_space(self, services, configurations):
+ """
+ Gets HIVE Server Interactive's 'llap_headroom_space' config. (Default value set to 6144 bytes).
+ """
+ llap_headroom_space = None
+ # Check if 'llap_headroom_space' is modified in current SA invocation.
+ if 'hive-interactive-env' in configurations and 'llap_headroom_space' in configurations['hive-interactive-env']['properties']:
+ hive_container_size = float(configurations['hive-interactive-env']['properties']['llap_headroom_space'])
+ Logger.info("'llap_headroom_space' read from configurations as : {0}".format(llap_headroom_space))
+
+ if llap_headroom_space is None:
+ # Check if 'llap_headroom_space' is input in services array.
+ if 'llap_headroom_space' in services['configurations']['hive-interactive-env']['properties']:
+ llap_headroom_space = float(services['configurations']['hive-interactive-env']['properties']['llap_headroom_space'])
+ Logger.info("'llap_headroom_space' read from services as : {0}".format(llap_headroom_space))
+ if not llap_headroom_space or llap_headroom_space < 1:
+ llap_headroom_space = 6144 # 6GB
+ Logger.info("Couldn't read 'llap_headroom_space' from services or configurations. Returing default value : 6144 bytes")
+
+ return llap_headroom_space
+
+ def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name, llap_queue_cap_perc):
+ """
+ Checks and (1). Creates 'llap' queue if only 'default' queue exist at leaf level and is consuming 100% capacity OR
+ (2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist
+ at root level : 'default' and 'llap'.
+ """
+ Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.")
+ putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
+ putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+ putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+ putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services)
+ leafQueueNames = None
+ hsi_site = self.getServicesSiteProperties(services, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ if capacity_scheduler_properties:
+ leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+ cap_sched_config_keys = capacity_scheduler_properties.keys()
+
+ yarn_default_queue_capacity = -1
+ if 'yarn.scheduler.capacity.root.default.capacity' in cap_sched_config_keys:
+ yarn_default_queue_capacity = float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.default.capacity'))
+
+ # Get 'llap' queue state
+ currLlapQueueState = ''
+ if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in cap_sched_config_keys:
+ currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state')
+
+ # Get 'llap' queue capacity
+ currLlapQueueCap = -1
+ if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity' in cap_sched_config_keys:
+ currLlapQueueCap = int(float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity')))
+
+ updated_cap_sched_configs_str = ''
+
+ enabled_hive_int_in_changed_configs = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False)
+ """
+ We create OR "modify 'llap' queue 'state and/or capacity' " based on below conditions:
+ - if only 1 queue exists at root level and is 'default' queue and has 100% cap -> Create 'llap' queue, OR
+ - if 2 queues exists at root level ('llap' and 'default') :
+ - Queue selected is 'llap' and state is STOPPED -> Modify 'llap' queue state to RUNNING, adjust capacity, OR
+ - Queue selected is 'llap', state is RUNNING and 'llap_queue_capacity' prop != 'llap' queue current running capacity ->
+ Modify 'llap' queue capacity to 'llap_queue_capacity'
+ """
+ if 'default' in leafQueueNames and \
+ ((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \
+ ((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \
+ ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_queue_cap_perc)))):
+ adjusted_default_queue_cap = str(100 - llap_queue_cap_perc)
+
+ hive_user = '*' # Open to all
+ if 'hive_user' in services['configurations']['hive-env']['properties']:
+ hive_user = services['configurations']['hive-env']['properties']['hive_user']
+
+ llap_queue_cap_perc = str(llap_queue_cap_perc)
+
+ # If capacity-scheduler configs are received as one concatenated string, we deposit the changed configs back as
+ # one concatenated string.
+ updated_cap_sched_configs_as_dict = False
+ if not received_as_key_value_pair:
+ for prop, val in capacity_scheduler_properties.items():
+ if llap_queue_name not in prop:
+ if prop == 'yarn.scheduler.capacity.root.queues':
+ updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+ + prop + "=default,llap\n"
+ elif prop == 'yarn.scheduler.capacity.root.default.capacity':
+ updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+ + prop + "=" + adjusted_default_queue_cap + "\n"
+ elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+ updated_cap_sched_configs_str = updated_cap_sched_configs_str \
+ + prop + "=" + adjusted_default_queue_cap + "\n"
+ elif prop.startswith('yarn.') and '.llap.' not in prop:
+ updated_cap_sched_configs_str = updated_cap_sched_configs_str + prop + "=" + val + "\n"
+
+ # Now, append the 'llap' queue related properties
+ updated_cap_sched_configs_str += """yarn.scheduler.capacity.root.{0}.user-limit-factor=1
+yarn.scheduler.capacity.root.{0}.state=RUNNING
+yarn.scheduler.capacity.root.{0}.ordering-policy=fifo
+yarn.scheduler.capacity.root.{0}.minimum-user-limit-percent=100
+yarn.scheduler.capacity.root.{0}.maximum-capacity={1}
+yarn.scheduler.capacity.root.{0}.capacity={1}
+yarn.scheduler.capacity.root.{0}.acl_submit_applications={2}
+yarn.scheduler.capacity.root.{0}.acl_administer_queue={2}
+yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_queue_name, llap_queue_cap_perc, hive_user)
+
+ putCapSchedProperty("capacity-scheduler", updated_cap_sched_configs_str)
+ Logger.info("Updated 'capacity-scheduler' configs as one concatenated string.")
+ else:
+ # If capacity-scheduler configs are received as a dictionary (generally 1st time), we deposit the changed
+ # values back as dictionary itself.
+ # Update existing configs in 'capacity-scheduler'.
+ for prop, val in capacity_scheduler_properties.items():
+ if llap_queue_name not in prop:
+ if prop == 'yarn.scheduler.capacity.root.queues':
+ putCapSchedProperty(prop, 'default,llap')
+ elif prop == 'yarn.scheduler.capacity.root.default.capacity':
+ putCapSchedProperty(prop, adjusted_default_queue_cap)
+ elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+ putCapSchedProperty(prop, adjusted_default_queue_cap)
+ elif prop.startswith('yarn.') and '.llap.' not in prop:
+ putCapSchedProperty(prop, val)
+
+ # Add new 'llap' queue related configs.
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".user-limit-factor", "1")
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".state", "RUNNING")
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".ordering-policy", "fifo")
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".minimum-user-limit-percent", "100")
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-capacity", llap_queue_cap_perc)
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".capacity", llap_queue_cap_perc)
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_submit_applications", hive_user)
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".acl_administer_queue", hive_user)
+ putCapSchedProperty("yarn.scheduler.capacity.root." + llap_queue_name + ".maximum-am-resource-percent", "1")
+
+ Logger.info("Updated 'capacity-scheduler' configs as a dictionary.")
+ updated_cap_sched_configs_as_dict = True
+
+ if updated_cap_sched_configs_str or updated_cap_sched_configs_as_dict:
+ if len(leafQueueNames) == 1: # 'llap' queue didn't exist before
+ Logger.info("Created YARN Queue : '{0}' with capacity : {1}%. Adjusted 'default' queue capacity to : {2}%" \
+ .format(llap_queue_name, llap_queue_cap_perc, adjusted_default_queue_cap))
+ else: # Queue existed, only adjustments done.
+ Logger.info("Adjusted YARN Queue : '{0}'. Current capacity : {1}%. State: RUNNING.".format(llap_queue_name, llap_queue_cap_perc))
+ Logger.info("Adjusted 'default' queue capacity to : {0}%".format(adjusted_default_queue_cap))
+
+ # Update Hive 'hive.llap.daemon.queue.name' prop to use 'llap' queue.
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+ # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility.
+ self.setLlapDaemonQueuePropAttributes(services, configurations)
+ else:
+ Logger.debug("Not creating/adjusting {0} queue. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames)))
+ else:
+ Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.")
+
+ def checkAndStopLlapQueue(self, services, configurations, llap_queue_name):
+ """
+ Checks and sees (1). If only two leaf queues exist at root level, namely: 'default' and 'llap',
+ and (2). 'llap' is in RUNNING state.
+
+ If yes, performs the following actions: (1). 'llap' queue state set to STOPPED,
+ (2). 'llap' queue capacity set to 0 %,
+ (3). 'default' queue capacity set to 100 %
+ """
+ putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services)
+ putHiveInteractiveSiteProperty = self.putProperty(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services)
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ updated_default_queue_configs = ''
+ updated_llap_queue_configs = ''
+ if capacity_scheduler_properties:
+ # Get all leaf queues.
+ leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+
+ if len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames and 'default' in leafQueueNames:
+ # Get 'llap' queue state
+ currLlapQueueState = 'STOPPED'
+ if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in capacity_scheduler_properties.keys():
+ currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state')
+ else:
+ Logger.error("{0} queue 'state' property not present in capacity scheduler. Skipping adjusting queues.".format(llap_queue_name))
+ return
+ if currLlapQueueState == 'RUNNING':
+ DEFAULT_MAX_CAPACITY = '100'
+ for prop, val in capacity_scheduler_properties.items():
+ # Update 'default' related configs in 'updated_default_queue_configs'
+ if llap_queue_name not in prop:
+ if prop == 'yarn.scheduler.capacity.root.default.capacity':
+ # Set 'default' capacity back to maximum val
+ updated_default_queue_configs = updated_default_queue_configs \
+ + prop + "="+DEFAULT_MAX_CAPACITY + "\n"
+ elif prop == 'yarn.scheduler.capacity.root.default.maximum-capacity':
+ # Set 'default' max. capacity back to maximum val
+ updated_default_queue_configs = updated_default_queue_configs \
+ + prop + "="+DEFAULT_MAX_CAPACITY + "\n"
+ elif prop.startswith('yarn.'):
+ updated_default_queue_configs = updated_default_queue_configs + prop + "=" + val + "\n"
+ else: # Update 'llap' related configs in 'updated_llap_queue_configs'
+ if prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state':
+ updated_llap_queue_configs = updated_llap_queue_configs \
+ + prop + "=STOPPED\n"
+ elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity':
+ updated_llap_queue_configs = updated_llap_queue_configs \
+ + prop + "=0\n"
+ elif prop == 'yarn.scheduler.capacity.root.'+llap_queue_name+'.maximum-capacity':
+ updated_llap_queue_configs = updated_llap_queue_configs \
+ + prop + "=0\n"
+ elif prop.startswith('yarn.'):
+ updated_llap_queue_configs = updated_llap_queue_configs + prop + "=" + val + "\n"
+ else:
+ Logger.debug("{0} queue state is : {1}. Skipping adjusting queues.".format(llap_queue_name, currLlapQueueState))
+ return
+
+ if updated_default_queue_configs and updated_llap_queue_configs:
+ putCapSchedProperty("capacity-scheduler", updated_default_queue_configs+updated_llap_queue_configs)
+ Logger.info("Changed YARN '{0}' queue state to 'STOPPED', and capacity to 0%. Adjusted 'default' queue capacity to : {1}%" \
+ .format(llap_queue_name, DEFAULT_MAX_CAPACITY))
+
+ # Update Hive 'hive.llap.daemon.queue.name' prop to use 'default' queue.
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', YARNRecommender.YARN_ROOT_DEFAULT_QUEUE_NAME)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', YARNRecommender.YARN_ROOT_DEFAULT_QUEUE_NAME)
+ else:
+ Logger.debug("Not removing '{0}' queue as number of Queues not equal to 2. Current YARN queues : {1}".format(llap_queue_name, list(leafQueueNames)))
+ else:
+ Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive.")
+
+ def setLlapDaemonQueuePropAttributes(self, services, configurations):
+ """
+ Checks and sets the 'Hive Server Interactive' 'hive.llap.daemon.queue.name' config Property Attributes. Takes into
+ account that 'capacity-scheduler' may have changed (got updated) in current Stack Advisor invocation.
+ """
+ Logger.info("Determining 'hive.llap.daemon.queue.name' config Property Attributes.")
+ #TODO Determine if this is doing the right thing if some queue is setup with capacity=0, or is STOPPED. Maybe don't list it.
+ putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, YARNRecommender.HIVE_INTERACTIVE_SITE)
+
+ capacity_scheduler_properties = dict()
+
+ # Read 'capacity-scheduler' from configurations if we modified and added recommendation to it, as part of current
+ # StackAdvisor invocation.
+ if "capacity-scheduler" in configurations:
+ cap_sched_props_as_dict = configurations["capacity-scheduler"]["properties"]
+ if 'capacity-scheduler' in cap_sched_props_as_dict:
+ cap_sched_props_as_str = configurations['capacity-scheduler']['properties']['capacity-scheduler']
+ if cap_sched_props_as_str:
+ cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n')
+ if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null':
+ # Got 'capacity-scheduler' configs as one "\n" separated string
+ for property in cap_sched_props_as_str:
+ key, sep, value = property.partition("=")
+ capacity_scheduler_properties[key] = value
+ Logger.info("'capacity-scheduler' configs is set as a single '\\n' separated string in current invocation. "
+ "count(configurations['capacity-scheduler']['properties']['capacity-scheduler']) = "
+ "{0}".format(len(capacity_scheduler_properties)))
+ else:
+ Logger.info("Read configurations['capacity-scheduler']['properties']['capacity-scheduler'] is : {0}".format(cap_sched_props_as_str))
+ else:
+ Logger.info("configurations['capacity-scheduler']['properties']['capacity-scheduler'] : {0}.".format(cap_sched_props_as_str))
+
+ # if 'capacity_scheduler_properties' is empty, implies we may have 'capacity-scheduler' configs as dictionary
+ # in configurations, if 'capacity-scheduler' changed in current invocation.
+ if not capacity_scheduler_properties:
+ if isinstance(cap_sched_props_as_dict, dict) and len(cap_sched_props_as_dict) > 1:
+ capacity_scheduler_properties = cap_sched_props_as_dict
+ Logger.info("'capacity-scheduler' changed in current Stack Advisor invocation. Retrieved the configs as dictionary from configurations.")
+ else:
+ Logger.info("Read configurations['capacity-scheduler']['properties'] is : {0}".format(cap_sched_props_as_dict))
+ else:
+ Logger.info("'capacity-scheduler' not modified in the current Stack Advisor invocation.")
+
+
+ # if 'capacity_scheduler_properties' is still empty, implies 'capacity_scheduler' wasn't change in current
+ # SA invocation. Thus, read it from input : 'services'.
+ if not capacity_scheduler_properties:
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ Logger.info("'capacity-scheduler' not changed in current Stack Advisor invocation. Retrieved the configs from services.")
+
+ # Get set of current YARN leaf queues.
+ leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+ if leafQueueNames:
+ leafQueues = [{"label": str(queueName), "value": queueName} for queueName in leafQueueNames]
+ leafQueues = sorted(leafQueues, key=lambda q: q['value'])
+ putHiveInteractiveSitePropertyAttribute("hive.llap.daemon.queue.name", "entries", leafQueues)
+ Logger.info("'hive.llap.daemon.queue.name' config Property Attributes set to : {0}".format(leafQueues))
+ else:
+ Logger.error("Problem retrieving YARN queues. Skipping updating HIVE Server Interactve "
+ "'hive.server2.tez.default.queues' property attributes.")
+
+ #TODO Convert this to a helper. It can apply to any property. Check config, or check if in the list of changed configurations and read the latest value
+ def get_yarn_min_container_size(self, services, configurations):
+ """
+ Gets YARN's minimum container size (yarn.scheduler.minimum-allocation-mb).
+ Reads from:
+ - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
+ is empty, else
+ - services['configurations'] (input).
+
+ services["changed-configurations"] would be empty if Stack Advisor call is made from Blueprints (1st invocation). Subsequent
+ Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advisor calculates this
+ value (configurations), it is finally not recommended, making 'input' value to survive.
+
+ :type services dict
+ :type configurations dict
+ :rtype str
+ """
+ yarn_min_container_size = None
+ yarn_min_allocation_property = "yarn.scheduler.minimum-allocation-mb"
+ yarn_site = self.getSiteProperties(configurations, "yarn-site")
+ yarn_site_properties = self.getServicesSiteProperties(services, "yarn-site")
+
+ # Check if services["changed-configurations"] is empty and 'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation.
+ if not services["changed-configurations"] and yarn_site and yarn_min_allocation_property in yarn_site:
+ yarn_min_container_size = yarn_site[yarn_min_allocation_property]
+ Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from configurations as : {0}".format(yarn_min_container_size))
+
+ # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services array.
+ elif yarn_site_properties and yarn_min_allocation_property in yarn_site_properties:
+ yarn_min_container_size = yarn_site_properties[yarn_min_allocation_property]
+ Logger.info("DBG: 'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size))
+
+ if not yarn_min_container_size:
+ Logger.error("{0} was not found in the configuration".format(yarn_min_allocation_property))
+
+ return yarn_min_container_size
+
+ def calculate_slider_am_size(self, yarn_min_container_size):
+ """
+ Calculates the Slider App Master size based on YARN's Minimum Container Size.
+
+ :type yarn_min_container_size int
+ """
+ if yarn_min_container_size > 1024:
+ return 1024
+ if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024:
+ return yarn_min_container_size
+ if yarn_min_container_size < 256:
+ return 256
+
+ def calculate_slider_am_size(self, yarn_min_container_size):
+ """
+ Calculates the Slider App Master size based on YARN's Minimum Container Size.
+
+ :type yarn_min_container_size int
+ """
+ if yarn_min_container_size > 1024:
+ return 1024
+ if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024:
+ return yarn_min_container_size
+ if yarn_min_container_size < 256:
+ return 256
+
+ def get_yarn_nm_mem_in_mb(self, services, configurations):
+ """
+ Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb).
+ Reads from:
+ - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"]
+ is empty, else
+ - services['configurations'] (input).
+
+ services["changed-configurations"] would be empty is Stack Advisor call if made from Blueprints (1st invocation). Subsequent
+ Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this
+ value (configurations), it is finally not recommended, making 'input' value to survive.
+ """
+ yarn_nm_mem_in_mb = None
+
+ yarn_site = self.getServicesSiteProperties(services, "yarn-site")
+ yarn_site_properties = self.getSiteProperties(configurations, "yarn-site")
+
+ # Check if services["changed-configurations"] is empty and 'yarn.nodemanager.resource.memory-mb' is modified in current ST invocation.
+ if not services["changed-configurations"] and yarn_site_properties and 'yarn.nodemanager.resource.memory-mb' in yarn_site_properties:
+ yarn_nm_mem_in_mb = float(yarn_site_properties['yarn.nodemanager.resource.memory-mb'])
+ elif yarn_site and 'yarn.nodemanager.resource.memory-mb' in yarn_site:
+ # Check if 'yarn.nodemanager.resource.memory-mb' is input in services array.
+ yarn_nm_mem_in_mb = float(yarn_site['yarn.nodemanager.resource.memory-mb'])
+
+ if yarn_nm_mem_in_mb <= 0.0:
+ Logger.warning("'yarn.nodemanager.resource.memory-mb' current value : {0}. Expected value : > 0".format(yarn_nm_mem_in_mb))
+
+ return yarn_nm_mem_in_mb
+
+ def calculate_tez_am_container_size(self, services, total_cluster_capacity):
+ """
+ Calculates Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site on initialization if values read is 0.
+ Else returns the read value.
+ """
+ tez_am_resource_memory_mb = self.get_tez_am_resource_memory_mb(services)
+ calculated_tez_am_resource_memory_mb = None
+ if tez_am_resource_memory_mb == YARNRecommender.CONFIG_VALUE_UINITIALIZED:
+ if total_cluster_capacity <= 4096:
+ calculated_tez_am_resource_memory_mb = 256
+ elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728:
+ calculated_tez_am_resource_memory_mb = 512
+ elif total_cluster_capacity > 73728:
+ calculated_tez_am_resource_memory_mb = 1536
+
+ Logger.info("DBG: Calculated and returning 'tez_am_resource_memory_mb' as : {0}".format(calculated_tez_am_resource_memory_mb))
+ return float(calculated_tez_am_resource_memory_mb)
+ else:
+ Logger.info("DBG: Returning 'tez_am_resource_memory_mb' as : {0}".format(tez_am_resource_memory_mb))
+ return float(tez_am_resource_memory_mb)
+
+ def get_tez_am_resource_memory_mb(self, services):
+ """
+ Gets Tez's AM resource memory (tez.am.resource.memory.mb) from services.
+ """
+ tez_am_resource_memory_mb = None
+ if 'tez.am.resource.memory.mb' in services['configurations']['tez-interactive-site']['properties']:
+ tez_am_resource_memory_mb = services['configurations']['tez-interactive-site']['properties']['tez.am.resource.memory.mb']
+
+ return tez_am_resource_memory_mb
+
+ def min_queue_perc_reqd_for_llap_and_hive_app(self, services, hosts, configurations):
+ """
+ Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state.
+ """
+ # Get queue size if sized at 20%
+ node_manager_hosts = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
+ yarn_rm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+ total_cluster_cap = len(node_manager_hosts) * yarn_rm_mem_in_mb
+ total_queue_size_at_20_perc = 20.0 / 100 * total_cluster_cap
+
+ # Calculate based on minimum size required by containers.
+ yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
+ slider_am_size = self.calculate_slider_am_size(float(yarn_min_container_size))
+ hive_tez_container_size = long(self.get_hive_tez_container_size(services))
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
+ normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) \
+ + self._normalizeUp(hive_tez_container_size, yarn_min_container_size) \
+ + self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+
+ min_required = max(total_queue_size_at_20_perc, normalized_val)
+ min_required_perc = min_required * 100 / total_cluster_cap
+
+ return int(ceil(min_required_perc))
+
+ def _normalizeDown(self, val1, val2):
+ """
+ Normalize down 'val2' with respect to 'val1'.
+ """
+
<TRUNCATED>