You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/12/30 22:14:24 UTC
[1/3] ambari git commit: Revert "AMBARI-19171 Configuring YARN with
custom queues leads to misleading errors in Stack Advisor (dgrinenko via
dsen)"
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 7024e4ecd -> 82e44131a
http://git-wip-us.apache.org/repos/asf/ambari/blob/82e44131/ambari-server/src/main/resources/stacks/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py
index 6b29bc3..8865b70 100644
--- a/ambari-server/src/main/resources/stacks/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/stack_advisor.py
@@ -829,7 +829,7 @@ class DefaultStackAdvisor(StackAdvisor):
if len(hostsList) != 1:
scheme = self.getComponentLayoutSchemes().get(componentName, None)
if scheme is not None:
- hostIndex = next((index for key, index in scheme.iteritems() if isinstance(key, (int, long)) and len(hostsList) < key), scheme['else'])
+ hostIndex = next((index for key, index in scheme.iteritems() if isinstance(key, ( int, long )) and len(hostsList) < key), scheme['else'])
else:
hostIndex = 0
for host in hostsList[hostIndex:]:
@@ -864,16 +864,16 @@ class DefaultStackAdvisor(StackAdvisor):
"""
return {}
+ """
+ Utility method used for validation warnings.
+ """
def getWarnItem(self, message):
- """
- Utility method used for validation warnings.
- """
return {"level": "WARN", "message": message}
+ """
+ Utility method used for validation errors.
+ """
def getErrorItem(self, message):
- """
- Utility method used for validation errors.
- """
return {"level": "ERROR", "message": message}
def getComponentHostNames(self, servicesDict, serviceName, componentName):
@@ -1076,49 +1076,6 @@ class DefaultStackAdvisor(StackAdvisor):
hostNamesList = [component["hostnames"] for component in componentsList if component["component_name"] == componentName]
return hostNamesList[0] if len(hostNamesList) > 0 else []
- def getServiceComponents(self, services, serviceName):
- """
- Return list of components for serviceName service
-
- :type services dict
- :type serviceName str
- :rtype list
- """
- components = []
-
- if not services or not serviceName:
- return components
-
- for service in services["services"]:
- if service["StackServices"]["service_name"] == serviceName:
- components.extend(service["components"])
- break
-
- return components
-
- def getHostsForComponent(self, services, serviceName, componentName):
- """
- Returns the host(s) on which a requested service's component is hosted.
-
- :argument services Configuration information for the cluster
- :argument serviceName Passed-in service in consideration
- :argument componentName Passed-in component in consideration
-
- :type services dict
- :type serviceName str
- :type componentName str
- :rtype list
- """
- hosts_for_component = []
- components = self.getServiceComponents(services, serviceName)
-
- for component in components:
- if component["StackServiceComponents"]["component_name"] == componentName:
- hosts_for_component.extend(component["StackServiceComponents"]["hostnames"])
- break
-
- return hosts_for_component
-
def getMountPoints(self, hosts):
"""
Return list of mounts available on the hosts
http://git-wip-us.apache.org/repos/asf/ambari/blob/82e44131/ambari-web/app/mixins/common/configs/enhanced_configs.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/mixins/common/configs/enhanced_configs.js b/ambari-web/app/mixins/common/configs/enhanced_configs.js
index aece4c2..88dac74 100644
--- a/ambari-web/app/mixins/common/configs/enhanced_configs.js
+++ b/ambari-web/app/mixins/common/configs/enhanced_configs.js
@@ -254,6 +254,9 @@ App.EnhancedConfigsMixin = Em.Mixin.create(App.ConfigWithOverrideRecommendationP
var changedConfigIds = this.get('changedProperties').map(function(changed) {
return App.config.configId(changed.propertyName, changed.propertyFileName);
});
+ if (this.get('currentlyChangedConfig')) {
+ return changedConfigIds.contains(App.config.configId(this.get('currentlyChangedConfig.name'), this.get('currentlyChangedConfig.fileName')));
+ }
return !changedConfigIds.contains(App.config.configId(c.get('name'), c.get('filename')));
}, this).length;
}, this).length;
[2/3] ambari git commit: Revert "AMBARI-19171 Configuring YARN with
custom queues leads to misleading errors in Stack Advisor (dgrinenko via
dsen)"
Posted by sm...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/82e44131/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
index b52d753..ddf407e 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
@@ -18,13 +18,13 @@ limitations under the License.
"""
import math
+import traceback
from ambari_commons.str_utils import string_set_equals
from resource_management.core.logger import Logger
from resource_management.core.exceptions import Fail
from resource_management.libraries.functions.get_bare_principal import get_bare_principal
-
class HDP25StackAdvisor(HDP24StackAdvisor):
def __init__(self):
@@ -94,7 +94,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
parentItems = super(HDP25StackAdvisor, self).getComponentLayoutValidations(services, hosts)
childItems = []
- hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
+ hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
if len(hsi_hosts) > 1:
message = "Only one host can install HIVE_SERVER_INTERACTIVE. "
childItems.append(
@@ -139,12 +139,14 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
"item": self.getWarnItem(
"Should be set to recommended value to report metrics to Ambari Metrics service.")})
+
return self.toConfigurationValidationProblems(validationItems, "storm-site")
def validateAtlasConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
- application_properties = self.getSiteProperties(configurations, "application-properties")
+ application_properties = getSiteProperties(configurations, "application-properties")
validationItems = []
+ #<editor-fold desc="LDAP and AD">
auth_type = application_properties['atlas.authentication.method.ldap.type']
auth_ldap_enable = application_properties['atlas.authentication.method.ldap'].lower() == 'true'
Logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type))
@@ -181,6 +183,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "":
validationItems.append({"config-name": prop,
"item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)})
+ #</editor-fold>
if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \
not application_properties['atlas.graph.index.search.solr.zookeeper-url']:
@@ -249,13 +252,13 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
def validateYarnConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
parentValidationProblems = super(HDP25StackAdvisor, self).validateYARNConfigurations(properties, recommendedDefaults, configurations, services, hosts)
- yarn_site_properties = self.getSiteProperties(configurations, "yarn-site")
+ yarn_site_properties = getSiteProperties(configurations, "yarn-site")
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
validationItems = []
- hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
+ hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
if len(hsi_hosts) > 0:
# HIVE_SERVER_INTERACTIVE is mapped to a host
if 'yarn.resourcemanager.work-preserving-recovery.enabled' not in yarn_site_properties or \
@@ -268,116 +271,124 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
validationProblems.extend(parentValidationProblems)
return validationProblems
+ """
+ Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs.
+ 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP
+ and Hive2 app.
+ 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'.
+ 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2.
+ 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP.
+ 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB.
+ """
def validateHiveInteractiveSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
- """
- Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs.
- 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP
- and Hive2 app.
- 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'.
- 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2.
- 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP.
- 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB.
- """
validationItems = []
- hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
- llap_queue_name = None
- llap_queue_cap_perc = None
+ hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
+ curr_selected_queue_for_llap = None
+ curr_selected_queue_for_llap_cap_perc = None
MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512
- llap_queue_cap = None
- hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
-
- if len(hsi_hosts) == 0:
- return []
+ current_selected_queue_for_llap_cap = None
- # Get total cluster capacity
- node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
- node_manager_cnt = len(node_manager_host_list)
- yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
- total_cluster_cap = node_manager_cnt * yarn_nm_mem_in_mb
- capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ if len(hsi_hosts) > 0:
+ # Get total cluster capacity
+ node_manager_host_list = self.get_node_manager_hosts(services, hosts)
+ node_manager_cnt = len(node_manager_host_list)
+ yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+ total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
- if not capacity_scheduler_properties:
- Logger.warning("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.")
- return []
-
- if hsi_site:
- if "hive.llap.daemon.queue.name" in hsi_site and hsi_site['hive.llap.daemon.queue.name']:
- llap_queue_name = hsi_site['hive.llap.daemon.queue.name']
- llap_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_queue_name, total_cluster_cap)
-
- if llap_queue_cap:
- llap_queue_cap_perc = float(llap_queue_cap * 100 / total_cluster_cap)
- min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
-
- # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required
- # to run LLAP and Hive2 app.
- if llap_queue_cap_perc < min_reqd_queue_cap_perc:
- errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \
- "app to run".format(llap_queue_name, llap_queue_cap_perc, min_reqd_queue_cap_perc)
- validationItems.append({"config-name": "hive.llap.daemon.queue.name", "item": self.getErrorItem(errMsg1)})
- else:
- Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for "
- "Hive Server Interactive.".format(llap_queue_name))
-
- # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED.
- llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_queue_name)
- if llap_selected_queue_state:
- if llap_selected_queue_state == "STOPPED":
- errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\
- .format(llap_queue_name, llap_selected_queue_state)
- validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)})
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ if capacity_scheduler_properties:
+ if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \
+ 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
+ curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
+ if curr_selected_queue_for_llap:
+ current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties,
+ curr_selected_queue_for_llap, total_cluster_capacity)
+ if current_selected_queue_for_llap_cap:
+ curr_selected_queue_for_llap_cap_perc = int(current_selected_queue_for_llap_cap * 100 / total_cluster_capacity)
+ min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations)
+
+ # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required
+ # to run LLAP and Hive2 app.
+ if curr_selected_queue_for_llap_cap_perc < min_reqd_queue_cap_perc:
+ errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \
+ "app to run".format(curr_selected_queue_for_llap, curr_selected_queue_for_llap_cap_perc, min_reqd_queue_cap_perc)
+ validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg1)})
+ else:
+ Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for "
+ "Hive Server Interactive.".format(curr_selected_queue_for_llap))
+
+ # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED.
+ llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap)
+ if llap_selected_queue_state:
+ if llap_selected_queue_state == "STOPPED":
+ errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\
+ .format(curr_selected_queue_for_llap, llap_selected_queue_state)
+ validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)})
+ else:
+ Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for "
+ "Hive Server Interactive.".format(curr_selected_queue_for_llap))
+ else:
+ Logger.error("Couldn't retrieve current selection for 'hive.llap.daemon.queue.name' while doing validation "
+ "checks for Hive Server Interactive.")
else:
- Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for "
- "Hive Server Interactive.".format(llap_queue_name))
+ Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing "
+ "validation checks for Hive Server Interactive.")
+ pass
else:
- Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing "
- "validation checks for Hive Server Interactive.")
-
- # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2.
- if 'hive.server2.enable.doAs' in hsi_site and hsi_site['hive.server2.enable.doAs'] == "true":
- validationItems.append({"config-name": "hive.server2.enable.doAs", "item": self.getErrorItem("Value should be set to 'false' for Hive2.")})
-
- # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that
- # 50% of selected queue for LLAP.
- if llap_queue_cap and 'hive.server2.tez.sessions.per.default.queue' in hsi_site:
- num_tez_sessions = hsi_site['hive.server2.tez.sessions.per.default.queue']
- if num_tez_sessions:
- num_tez_sessions = long(num_tez_sessions)
- yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
- tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_cap))
- normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
- llap_selected_queue_cap_remaining = llap_queue_cap - (normalized_tez_am_container_size * num_tez_sessions)
- if llap_selected_queue_cap_remaining <= llap_queue_cap/2:
- errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \
- "'{1}' queue for LLAP.".format(num_tez_sessions, llap_queue_name)
- validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)})
-
- # Validate that 'remaining available capacity' in cluster is at least 512 MB, after 'llap' queue is selected,
- # in order to run Service Checks.
- if llap_queue_name and llap_queue_cap_perc and llap_queue_name == self.AMBARI_MANAGED_LLAP_QUEUE_NAME:
- curr_selected_queue_for_llap_cap = float(llap_queue_cap_perc) / 100 * total_cluster_cap
- available_cap_in_cluster = total_cluster_cap - curr_selected_queue_for_llap_cap
- if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS:
- errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \
- "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster)
- validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)})
+ Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.")
+ pass
+
+ if self.HIVE_INTERACTIVE_SITE in services['configurations']:
+ # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2.
+ if 'hive.server2.enable.doAs' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
+ hive2_enable_do_as = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.server2.enable.doAs']
+ if hive2_enable_do_as == 'true':
+ validationItems.append({"config-name": "hive.server2.enable.doAs","item": self.getErrorItem("Value should be set to 'false' for Hive2.")})
+
+ # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that
+ # 50% of selected queue for LLAP.
+ if current_selected_queue_for_llap_cap and 'hive.server2.tez.sessions.per.default.queue' in \
+ services['configurations']['hive-interactive-site']['properties']:
+ num_tez_sessions = services['configurations']['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue']
+ if num_tez_sessions:
+ num_tez_sessions = long(num_tez_sessions)
+ yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
+ normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+ llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions)
+ if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2:
+ errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \
+ "'{1}' queue for LLAP.".format(num_tez_sessions, curr_selected_queue_for_llap)
+ validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)})
+
+ # Validate that 'remaining available capacity' in cluster is atleast 512 MB, after 'llap' queue is selected,
+ # in order to run Service Checks.
+ if curr_selected_queue_for_llap and curr_selected_queue_for_llap_cap_perc and \
+ curr_selected_queue_for_llap == self.AMBARI_MANAGED_LLAP_QUEUE_NAME:
+ curr_selected_queue_for_llap_cap = float(curr_selected_queue_for_llap_cap_perc) / 100 * total_cluster_capacity
+ available_cap_in_cluster = total_cluster_capacity - curr_selected_queue_for_llap_cap
+ if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS:
+ errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \
+ "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster)
+ validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)})
validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site")
return validationProblems
def validateHiveInteractiveEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
- hive_site_env_properties = self.getSiteProperties(configurations, "hive-interactive-env")
+ hive_site_env_properties = getSiteProperties(configurations, "hive-interactive-env")
validationItems = []
- hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
+ hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
if len(hsi_hosts) > 0:
# HIVE_SERVER_INTERACTIVE is mapped to a host
if 'enable_hive_interactive' not in hive_site_env_properties or (
- 'enable_hive_interactive' in hive_site_env_properties and
- hive_site_env_properties['enable_hive_interactive'].lower() != 'true'):
-
+ 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[
+ 'enable_hive_interactive'].lower() != 'true'):
validationItems.append({"config-name": "enable_hive_interactive",
"item": self.getErrorItem(
"HIVE_SERVER_INTERACTIVE requires enable_hive_interactive in hive-interactive-env set to true.")})
+ pass
+
else:
# no HIVE_SERVER_INTERACTIVE
if 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[
@@ -385,6 +396,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
validationItems.append({"config-name": "enable_hive_interactive",
"item": self.getErrorItem(
"enable_hive_interactive in hive-interactive-env should be set to false.")})
+ pass
+ pass
validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-env")
return validationProblems
@@ -424,15 +437,15 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts)
- storm_site = self.getServicesSiteProperties(services, "storm-site")
+ storm_site = getServicesSiteProperties(services, "storm-site")
putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site")
security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site)
-
+
if security_enabled:
_storm_principal_name = services['configurations']['storm-env']['properties']['storm_principal_name']
storm_bare_jaas_principal = get_bare_principal(_storm_principal_name)
- if 'nimbus.impersonation.acl' in storm_site:
+ if 'nimbus.impersonation.acl' in storm_site:
storm_nimbus_impersonation_acl = storm_site["nimbus.impersonation.acl"]
storm_nimbus_impersonation_acl.replace('{{storm_bare_jaas_principal}}', storm_bare_jaas_principal)
putStormSiteProperty('nimbus.impersonation.acl', storm_nimbus_impersonation_acl)
@@ -599,7 +612,7 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
if 'atlas-env' in services['configurations']:
atlas_server_metadata_size = 50000
if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']:
- atlas_server_metadata_size = float(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size'])
+ atlas_server_metadata_size = int(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size'])
atlas_server_xmx = 2048
@@ -656,42 +669,47 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
# For 'Hive Server Interactive', if the component exists.
- hsi_hosts = self.getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
- hsi_properties = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
-
+ hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE")
if len(hsi_hosts) > 0:
+ hsi_host = hsi_hosts[0]
putHiveInteractiveEnvProperty('enable_hive_interactive', 'true')
# Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed.
- if hsi_properties and 'hive.llap.daemon.queue.name' in hsi_properties:
+ if self.HIVE_INTERACTIVE_SITE in services['configurations']:
+ if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
self.setLlapDaemonQueuePropAttributes(services, configurations)
- hsi_conf_properties = self.getSiteProperties(configurations, self.HIVE_INTERACTIVE_SITE)
-
- hive_tez_default_queue = hsi_properties["hive.llap.daemon.queue.name"]
- if hsi_conf_properties and "hive.llap.daemon.queue.name" in hsi_conf_properties:
- hive_tez_default_queue = hsi_conf_properties['hive.llap.daemon.queue.name']
-
+ # Update 'hive.server2.tez.default.queues' value
+ hive_tez_default_queue = None
+ if 'hive-interactive-site' in configurations and \
+ 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']:
+ hive_tez_default_queue = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
+ Logger.info("'hive.llap.daemon.queue.name' value from configurations : '{0}'".format(hive_tez_default_queue))
+ if not hive_tez_default_queue:
+ hive_tez_default_queue = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
+ Logger.info("'hive.llap.daemon.queue.name' value from services : '{0}'".format(hive_tez_default_queue))
if hive_tez_default_queue:
putHiveInteractiveSiteProperty("hive.server2.tez.default.queues", hive_tez_default_queue)
- Logger.debug("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue))
+ Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue))
else:
putHiveInteractiveEnvProperty('enable_hive_interactive', 'false')
putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
- if hsi_properties and "hive.llap.zk.sm.connectionString" in hsi_properties:
+ if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \
+ 'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
+ # Fill the property 'hive.llap.zk.sm.connectionString' required by Hive Server Interactive (HiveServer2)
zookeeper_host_port = self.getZKHostPortString(services)
if zookeeper_host_port:
putHiveInteractiveSiteProperty("hive.llap.zk.sm.connectionString", zookeeper_host_port)
+ pass
def recommendYARNConfigurations(self, configurations, clusterData, services, hosts):
super(HDP25StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts)
- hsi_env_poperties = self.getServicesSiteProperties(services, "hive-interactive-env")
- cluster_env = self.getServicesSiteProperties(services, "cluster-env")
# Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP)
- if hsi_env_poperties and 'enable_hive_interactive' in hsi_env_poperties:
- enable_hive_interactive = hsi_env_poperties['enable_hive_interactive']
+ if 'hive-interactive-env' in services['configurations'] and \
+ 'enable_hive_interactive' in services['configurations']['hive-interactive-env']['properties']:
+ enable_hive_interactive = services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']
LLAP_QUEUE_NAME = 'llap'
# Hive Server interactive is already added or getting added
@@ -702,8 +720,8 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services)
stack_root = "/usr/hdp"
- if cluster_env and "stack_root" in cluster_env:
- stack_root = cluster_env["stack_root"]
+ if "cluster-env" in services["configurations"] and "stack_root" in services["configurations"]["cluster-env"]["properties"]:
+ stack_root = services["configurations"]["cluster-env"]["properties"]["stack_root"]
timeline_plugin_classes_values = []
timeline_plugin_classpath_values = []
@@ -718,457 +736,546 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values))
putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values))
- def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
- """
- Entry point for updating Hive's 'LLAP app' configs namely :
- (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
- (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
- (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size
- (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and
- (13). hive.llap.io.enabled.
+ """
+ Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
+ (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb,
+ (7). hive.server2.tez.sessions.per.default.queue, (8). tez.am.resource.memory.mb (9). hive.tez.container.size
+ (10). tez.runtime.io.sort.mb (11). tez.runtime.unordered.output.buffer.size-mb (12). hive.llap.io.threadpool.size, and
+ (13). hive.llap.io.enabled.
- The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
- (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
- (4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
+ The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
+ (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
+ (4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
- If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
- value is not calulated, but read and use in calculation for dependent configs.
+ If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
+ value is not calulated, but read and use in calculation for dependent configs.
- Note: All memory calculations are in MB, unless specified otherwise.
- """
+ Note: All memory caluclations are in MB, unless specified otherwise.
+ """
+ def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
+ Logger.info("Entered updateLlapConfigs() ..")
putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services)
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE)
+
putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+
putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services)
+
llap_daemon_selected_queue_name = None
- selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed.
+ selected_queue_is_ambari_managed_llap = None # Queue named 'llap' at root level is Ambari managed.
llap_selected_queue_am_percent = None
DEFAULT_EXECUTOR_TO_AM_RATIO = 20
MIN_EXECUTOR_TO_AM_RATIO = 10
MAX_CONCURRENT_QUERIES = 32
leafQueueNames = None
MB_TO_BYTES = 1048576
- hsi_site = self.getServicesSiteProperties(services, self.HIVE_INTERACTIVE_SITE)
- yarn_site = self.getServicesSiteProperties(services, "yarn-site")
# Update 'hive.llap.daemon.queue.name' prop combo entries
self.setLlapDaemonQueuePropAttributes(services, configurations)
if not services["changed-configurations"]:
read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations))
- putHiveInteractiveSiteProperty("hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', read_llap_daemon_yarn_cont_mb)
+ # initial memory setting to make sure hive.llap.daemon.yarn.container.mb >= yarn.scheduler.minimum-allocation-mb
+ Logger.info("Adjusted 'hive.llap.daemon.yarn.container.mb' to yarn min container size as initial size "
+ "(" + str(self.get_yarn_min_container_size(services, configurations)) + " MB).")
- if hsi_site and "hive.llap.daemon.queue.name" in hsi_site:
- llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"]
+ try:
+ if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \
+ 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']:
+ llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name']
- # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'.
- capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
- if capacity_scheduler_properties:
- # Get all leaf queues.
- leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
- Logger.info("YARN leaf Queues = {0}".format(leafQueueNames))
- if len(leafQueueNames) == 0:
- Logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.")
- return
-
- # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
- changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(services, "hive-interactive-env", ['enable_hive_interactive'], False)
- llap_named_queue_selected_in_curr_invocation = None
- if changed_configs_has_enable_hive_int \
- and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
- if len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames):
- llap_named_queue_selected_in_curr_invocation = True
- putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
- putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+ # Update Visibility of 'num_llap_nodes' slider. Visible only if selected queue is Ambari created 'llap'.
+ capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services)
+ if capacity_scheduler_properties:
+ # Get all leaf queues.
+ leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+ Logger.info("YARN leaf Queues = {0}".format(leafQueueNames))
+ if len(leafQueueNames) == 0:
+ raise Fail("Queue(s) couldn't be retrieved from capacity-scheduler.")
+
+ # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
+ changed_configs_has_enable_hive_int = self.are_config_props_in_changed_configs(services, "hive-interactive-env",
+ set(['enable_hive_interactive']), False)
+ llap_named_queue_selected_in_curr_invocation = None
+ if changed_configs_has_enable_hive_int \
+ and services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive']:
+ if (len(leafQueueNames) == 1 or (len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames)):
+ llap_named_queue_selected_in_curr_invocation = True
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', llap_queue_name)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', llap_queue_name)
+ Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(llap_queue_name))
+ else:
+ first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected.
+ putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue)
+ putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue)
+ llap_named_queue_selected_in_curr_invocation = False
+ Logger.info("'hive.llap.daemon.queue.name' and 'hive.server2.tez.default.queues' values set as : {0}".format(first_leaf_queue))
+ Logger.info("llap_named_queue_selected_in_curr_invocation = {0}".format(llap_named_queue_selected_in_curr_invocation))
+
+ if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \
+ llap_named_queue_selected_in_curr_invocation) or \
+ (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation):
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true")
+ Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+ "slider visibility to 'True'".format(llap_queue_name, list(leafQueueNames)))
+ selected_queue_is_ambari_managed_llap = True
else:
- first_leaf_queue = list(leafQueueNames)[0] # 1st invocation, pick the 1st leaf queue and set it as selected.
- putHiveInteractiveSiteProperty('hive.llap.daemon.queue.name', first_leaf_queue)
- putHiveInteractiveSiteProperty('hive.server2.tez.default.queues', first_leaf_queue)
- llap_named_queue_selected_in_curr_invocation = False
-
- if (len(leafQueueNames) == 2 and (llap_daemon_selected_queue_name and llap_daemon_selected_queue_name == llap_queue_name) or
- llap_named_queue_selected_in_curr_invocation) or \
- (len(leafQueueNames) == 1 and llap_daemon_selected_queue_name == 'default' and llap_named_queue_selected_in_curr_invocation):
- putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "true")
- selected_queue_is_ambari_managed_llap = True
+ putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
+ Logger.info("Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
+ "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames)))
+ selected_queue_is_ambari_managed_llap = False
+
+ if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
+ # state information pertaining to 'llap' queue.
+ # Check: State of the selected queue should not be STOPPED.
+ if llap_daemon_selected_queue_name:
+ llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
+ if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED":
+ raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
+ "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state))
+ else:
+ raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
+ .format(llap_daemon_selected_queue_name))
else:
- putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "visible", "false")
- selected_queue_is_ambari_managed_llap = False
-
- if not llap_named_queue_selected_in_curr_invocation: # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
- # state information pertaining to 'llap' queue.
- # Check: State of the selected queue should not be STOPPED.
- if llap_daemon_selected_queue_name:
- llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name)
- if llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED":
- Logger.error("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
- "values.".format(llap_daemon_selected_queue_name, llap_selected_queue_state))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
- else:
- Logger.error("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values."
- .format(llap_daemon_selected_queue_name))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
- else:
- Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
- " Not calculating LLAP configs.")
- return
-
- changed_configs_in_hive_int_env = None
- llap_concurrency_in_changed_configs = None
- llap_daemon_queue_in_changed_configs = None
- # Calculations are triggered only if there is change in any one of the following props :
- # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
- # or 'hive.llap.daemon.queue.name' has change in value selection.
- # OR
- # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
- if 'changed-configurations' in services.keys():
- config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive'])
- changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(services, "hive-interactive-env", config_names_to_be_checked, False)
-
- # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
- llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(services, self.HIVE_INTERACTIVE_SITE, ['hive.server2.tez.sessions.per.default.queue'], False)
- llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(services, self.HIVE_INTERACTIVE_SITE, ['hive.llap.daemon.queue.name'], False)
-
- if not changed_configs_in_hive_int_env and not llap_concurrency_in_changed_configs and \
- not llap_daemon_queue_in_changed_configs and services["changed-configurations"]:
-
- return
-
- node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
- node_manager_cnt = len(node_manager_host_list)
- yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
- total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
- yarn_min_container_size = float(self.get_yarn_min_container_size(services, configurations))
- tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
- normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
-
- if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site:
- cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"])
- else:
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- # Calculate the available memory for LLAP app
- yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
- mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)
-
- if mem_per_thread_for_llap is None:
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- mem_per_thread_for_llap = float(mem_per_thread_for_llap)
-
- if not selected_queue_is_ambari_managed_llap:
- llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity)
-
- if llap_daemon_selected_queue_cap <= 0:
- Logger.warning("'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format(
- llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
+ " Not calculating LLAP configs.")
return
- total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size)
- num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized)
- queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name))
- hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
- else: # Ambari managed 'llap' named queue at root level.
- num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input
- total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
- total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size))
-
- # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
- llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100)
-
- if llap_named_queue_cap_fraction > 100:
- Logger.warning("Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
+ changed_configs_in_hive_int_env = None
+ llap_concurrency_in_changed_configs = None
+ llap_daemon_queue_in_changed_configs = None
+ # Calculations are triggered only if there is change in any one of the following props :
+ # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
+ # or 'hive.llap.daemon.queue.name' has change in value selection.
+ # OR
+ # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
+ if 'changed-configurations' in services.keys():
+ config_names_to_be_checked = set(['num_llap_nodes', 'enable_hive_interactive'])
+ changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env",
+ config_names_to_be_checked, False)
+
+ # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
+ llap_concurrency_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site",
+ set(['hive.server2.tez.sessions.per.default.queue']), False)
+ llap_daemon_queue_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site",
+ set(['hive.llap.daemon.queue.name']), False)
+
+ if not changed_configs_in_hive_int_env and \
+ not llap_concurrency_in_changed_configs and \
+ not llap_daemon_queue_in_changed_configs and \
+ services["changed-configurations"]:
+ Logger.info("LLAP parameters not modified. Not adjusting LLAP configs.")
+ Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"]))
return
- # Adjust capacity scheduler for the 'llap' named queue.
- self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction)
- hive_tez_am_cap_available = total_llap_mem_normalized
-
- # Common calculations now, irrespective of the queue selected.
-
- # Get calculated value for Slider AM container Size
- slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
- yarn_min_container_size)
-
- llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size
-
- if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size:
- Logger.warning("Not enough capacity available on the cluster to run LLAP")
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- # Calculate llap concurrency (i.e. Number of Tez AM's)
- max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
-
- # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
- if not llap_concurrency_in_changed_configs:
- if max_executors_per_node <= 0:
- Logger.warning("Calculated 'max_executors_per_node' = {0}. Expected value >= 1.".format(max_executors_per_node))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- # Default 1 AM for every 20 executor threads.
- # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
- # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
- # instead limited by #CPUs. Use maxPerNode to factor this in.
- llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
- llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size)))
- if llap_concurrency == 0:
- llap_concurrency = 1
-
- if llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available:
- llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
-
- if llap_concurrency <= 0:
- Logger.warning("Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
- else:
- # Read current value
- if 'hive.server2.tez.sessions.per.default.queue' in hsi_site:
- llap_concurrency = long(hsi_site['hive.server2.tez.sessions.per.default.queue'])
- if llap_concurrency <= 0:
- Logger.warning("'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1".format(llap_concurrency))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
+ Logger.info("\nPerforming LLAP config calculations ......")
+ node_manager_host_list = self.get_node_manager_hosts(services, hosts)
+ node_manager_cnt = len(node_manager_host_list)
+ yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
+ total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
+ Logger.info("Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
+ "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb))
+
+ yarn_min_container_size = self.get_yarn_min_container_size(services, configurations)
+
+ tez_am_container_size = self.calculate_tez_am_container_size(services, long(total_cluster_capacity))
+ normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size)
+ cpu_per_nm_host = self.get_cpu_per_nm_host(services)
+ Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
+ "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size,
+ total_cluster_capacity))
+
+ # Calculate the available memory for LLAP app
+ yarn_nm_mem_in_mb_normalized = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size)
+ mem_per_thread_for_llap = self.calculate_mem_per_thread_for_llap(services, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host)
+ Logger.info("Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
+ "cpu_per_nm_host : {2}".format(mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host))
+
+ Logger.info("selected_queue_is_ambari_managed_llap = {0}".format(selected_queue_is_ambari_managed_llap))
+ if not selected_queue_is_ambari_managed_llap:
+ llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, llap_daemon_selected_queue_name, total_cluster_capacity)
+ assert(llap_daemon_selected_queue_cap > 0, "'{0}' queue capacity percentage retrieved = {1}. "
+ "Expected > 0.".format(llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap))
+ total_llap_mem_normalized = self._normalizeDown(llap_daemon_selected_queue_cap, yarn_min_container_size)
+ Logger.info("Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
+ "yarn_min_container_size : {3}".format(llap_daemon_selected_queue_name, total_llap_mem_normalized,
+ llap_daemon_selected_queue_cap, yarn_min_container_size))
+ num_llap_nodes_requested = math.floor(total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized)
+ Logger.info("Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
+ "yarn_nm_mem_in_mb_normalized : {2}".format(num_llap_nodes_requested, total_llap_mem_normalized, yarn_nm_mem_in_mb_normalized))
+ queue_am_fraction_perc = float(self.__getQueueAmFractionFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name))
+ hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
+ Logger.info("Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
+ "total_llap_mem_normalized : {2}".format(hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized))
+ else: # Ambari managed 'llap' named queue at root level.
+ num_llap_nodes_requested = self.get_num_llap_nodes(services, configurations) #Input
+ total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
+ Logger.info("Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
+ "yarn_nm_mem_in_mb_normalized : {2}".format(total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized))
+ total_llap_mem_normalized = float(self._normalizeDown(total_llap_mem, yarn_min_container_size))
+ Logger.info("Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
+ "yarn_min_container_size : {2}".format(total_llap_mem_normalized, total_llap_mem, yarn_min_container_size))
+ # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
+ llap_named_queue_cap_fraction = math.ceil(total_llap_mem_normalized / total_cluster_capacity * 100)
+ assert(llap_named_queue_cap_fraction <= 100), "Calculated '{0}' queue size = {1}. Cannot be > 100.".format(llap_queue_name, llap_named_queue_cap_fraction)
+ Logger.info("Calculated '{0}' queue capacity percent = {1}.".format(llap_queue_name, llap_named_queue_cap_fraction))
+ # Adjust capacity scheduler for the 'llap' named queue.
+ self.checkAndManageLlapQueue(services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction)
+ hive_tez_am_cap_available = total_llap_mem_normalized
+ Logger.info("hive_tez_am_cap_available : {0}".format(hive_tez_am_cap_available))
+
+ #Common calculations now, irrespective of the queue selected.
+
+ # Get calculated value for Slider AM container Size
+ slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size),
+ yarn_min_container_size)
+ Logger.info("Calculated 'slider_am_container_size' : {0}, using following: yarn_min_container_size : "
+ "{1}".format(slider_am_container_size, yarn_min_container_size))
+
+ llap_mem_for_tezAm_and_daemons = total_llap_mem_normalized - slider_am_container_size
+ assert (llap_mem_for_tezAm_and_daemons >= 2 * yarn_min_container_size), "Not enough capacity available on the cluster to run LLAP"
+ Logger.info("Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
+ "slider_am_container_size : {2}".format(llap_mem_for_tezAm_and_daemons, total_llap_mem_normalized, slider_am_container_size))
+
+
+ # Calculate llap concurrency (i.e. Number of Tez AM's)
+ max_executors_per_node = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+
+ # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
+ if not llap_concurrency_in_changed_configs:
+ assert(max_executors_per_node > 0), "Calculated 'max_executors_per_node' = {1}. Expected value >= 1.".format(max_executors_per_node)
+ Logger.info("Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+ "mem_per_thread_for_llap: {3}".format(max_executors_per_node, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+ # Default 1 AM for every 20 executor threads.
+ # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
+ # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
+ # instead limited by #CPUs. Use maxPerNode to factor this in.
+ llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / DEFAULT_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+ Logger.info("Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
+ ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(llap_concurreny_limit, max_executors_per_node, num_llap_nodes_requested, DEFAULT_EXECUTOR_TO_AM_RATIO, MAX_CONCURRENT_QUERIES))
+ llap_concurrency = min(llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap + normalized_tez_am_container_size)))
+ Logger.info("Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+ "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+ "{5}".format(llap_concurrency, llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, DEFAULT_EXECUTOR_TO_AM_RATIO,
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
+ if (llap_concurrency == 0):
+ llap_concurrency = 1
+ Logger.info("Adjusted 'llap_concurrency' : 1.")
+
+ if (llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available):
+ llap_concurrency = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+ assert(llap_concurrency > 0), "Calculated 'LLAP Concurrent Queries' = {0}. Expected value >= 1.".format(llap_concurrency)
+ Logger.info("Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+ "{2}".format(llap_concurrency, hive_tez_am_cap_available, normalized_tez_am_container_size))
else:
- llap_concurrency = 1
- Logger.warning("Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1.")
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
- max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
- max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO *
- mem_per_thread_for_llap + normalized_tez_am_container_size)))
- if max_llap_concurreny == 0:
- max_llap_concurreny = 1
-
- if (max_llap_concurreny * normalized_tez_am_container_size) > hive_tez_am_cap_available:
- max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
- if max_llap_concurreny <= 0:
- Logger.warning("Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
-
- # Calculate value for 'num_llap_nodes', an across cluster config.
- tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
- llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
-
- if llap_mem_daemon_size < yarn_min_container_size:
- Logger.warning("Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format(
- llap_mem_daemon_size, yarn_min_container_size))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
+ # Read current value
+ if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][
+ 'properties']:
+ llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][
+ 'hive.server2.tez.sessions.per.default.queue'])
+ assert (llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \
+ .format(llap_concurrency)
+ Logger.info("Read 'llap_concurrency' : {0}".format(llap_concurrency ))
+ else:
+ raise Fail(
+ "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.")
+
+ # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
+ max_llap_concurreny_limit = min(math.floor(max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO), MAX_CONCURRENT_QUERIES)
+ Logger.info("Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
+ ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(max_llap_concurreny_limit, max_executors_per_node,
+ num_llap_nodes_requested, MIN_EXECUTOR_TO_AM_RATIO,
+ MAX_CONCURRENT_QUERIES))
+ max_llap_concurreny = min(max_llap_concurreny_limit, math.floor(llap_mem_for_tezAm_and_daemons / (MIN_EXECUTOR_TO_AM_RATIO *
+ mem_per_thread_for_llap + normalized_tez_am_container_size)))
+ Logger.info("Calculated 'max_llap_concurreny' : {0}, using following : max_llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
+ "{2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
+ "{5}".format(max_llap_concurreny, max_llap_concurreny_limit, llap_mem_for_tezAm_and_daemons, MIN_EXECUTOR_TO_AM_RATIO,
+ mem_per_thread_for_llap, normalized_tez_am_container_size))
+ if (max_llap_concurreny == 0):
+ max_llap_concurreny = 1
+ Logger.info("Adjusted 'max_llap_concurreny' : 1.")
+
+ if (max_llap_concurreny * normalized_tez_am_container_size > hive_tez_am_cap_available):
+ max_llap_concurreny = math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
+ assert(max_llap_concurreny > 0), "Calculated 'Max. LLAP Concurrent Queries' = {0}. Expected value > 1".format(max_llap_concurreny)
+ Logger.info("Adjusted 'max_llap_concurreny' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
+ "{2}".format(max_llap_concurreny, hive_tez_am_cap_available, normalized_tez_am_container_size))
+
+
+ # Calculate value for 'num_llap_nodes', an across cluster config.
+ tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
+ Logger.info("Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
+ "{2}".format(tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size))
+ llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
+ assert (llap_mem_daemon_size >= yarn_min_container_size), "Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container " \
+ "Size' ({1})'".format(llap_mem_daemon_size, yarn_min_container_size)
+ assert(llap_mem_daemon_size >= mem_per_thread_for_llap or llap_mem_daemon_size >= yarn_min_container_size), "Not enough memory available for executors."
+ Logger.info("Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
+ "{2}".format(llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required))
+
+ llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size)
+ Logger.info("Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
+ "yarn_min_container_size: {3}".format(llap_daemon_mem_per_node, llap_mem_daemon_size, num_llap_nodes_requested, yarn_min_container_size))
+ if (llap_daemon_mem_per_node == 0):
+ # Small cluster. No capacity left on a node after running AMs.
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
+ num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+ Logger.info("'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
+ "mem_per_thread_for_llap : {3}".format(llap_daemon_mem_per_node, num_llap_nodes, llap_mem_daemon_size, mem_per_thread_for_llap))
+ elif (llap_daemon_mem_per_node < mem_per_thread_for_llap):
+ # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
+ llap_daemon_mem_per_node = mem_per_thread_for_llap
+ num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
+ Logger.info("'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
+ ": {2}".format(llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node))
+ else:
+ # All good. We have a proper value for memoryPerNode.
+ num_llap_nodes = num_llap_nodes_requested
+ Logger.info("num_llap_nodes : {0}".format(num_llap_nodes))
+
+ num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
+ assert(num_executors_per_node_max >= 1), "Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max)
+ Logger.info("Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node_max, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap))
+
+ # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
+ num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max)
+ assert(num_executors_per_node > 0), "Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node)
+ Logger.info("Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
+ "mem_per_thread_for_llap: {3}".format(num_executors_per_node, llap_daemon_mem_per_node, num_executors_per_node_max, mem_per_thread_for_llap))
+
+ # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
+ total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
+ cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
+
+ tez_runtime_io_sort_mb = ((long)((0.8 * mem_per_thread_for_llap) / 3))
+ tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap)
+ # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
+ hive_auto_convert_join_noconditionaltask_size = ((long)((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES
+
+ # Calculate value for prop 'llap_heap_size'
+ llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations))
+ Logger.info("Calculated llap_app_heap_size : {0}, using following : total_mem_for_executors : {1}".format(llap_xmx, total_mem_for_executors_per_node))
+
+ # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
+ hive_server_interactive_heapsize = None
+ hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
+ if hive_server_interactive_hosts is None:
+ # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
+ hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
+ if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0:
+ host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
+ hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem)
+ Logger.info("Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
+ "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem))
+
+
+ Logger.info("Updating the calculations....")
+
+ # Done with calculations, updating calculated configs.
+
+ normalized_tez_am_container_size = long(normalized_tez_am_container_size)
+ putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size)
+ Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format(normalized_tez_am_container_size))
+
+ if not llap_concurrency_in_changed_configs:
+ min_llap_concurrency = 1
+ putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum",
+ min_llap_concurrency)
+
+ Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}" \
+ .format(min_llap_concurrency, llap_concurrency))
+
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny)
+ Logger.info("Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Max : {0}".format(max_llap_concurreny))
+
+ num_llap_nodes = long(num_llap_nodes)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+ if (num_llap_nodes != num_llap_nodes_requested):
+ Logger.info("User requested num_llap_nodes : {0}, but used/adjusted value for calculations is : {1}".format(num_llap_nodes_requested, num_llap_nodes))
+ else:
+ Logger.info("Used num_llap_nodes for calculations : {0}".format(num_llap_nodes_requested))
+ Logger.info("LLAP config 'num_llap_nodes' updated. Min: 1, Max: {0}".format(node_manager_cnt))
- if llap_mem_daemon_size < mem_per_thread_for_llap or llap_mem_daemon_size < yarn_min_container_size:
- Logger.warning("Not enough memory available for executors.")
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
+ llap_container_size = long(llap_daemon_mem_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size)
+ Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size))
- llap_daemon_mem_per_node = self._normalizeDown(llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size)
- if llap_daemon_mem_per_node == 0:
- # Small cluster. No capacity left on a node after running AMs.
- llap_daemon_mem_per_node = mem_per_thread_for_llap
- num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
- elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
- # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
- llap_daemon_mem_per_node = mem_per_thread_for_llap
- num_llap_nodes = math.floor(llap_mem_daemon_size / mem_per_thread_for_llap)
- else:
- # All good. We have a proper value for memoryPerNode.
- num_llap_nodes = num_llap_nodes_requested
+ # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
+ # Else, we don't (1). Override the previous calculated value or (2). User provided value.
+ if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED:
+ mem_per_thread_for_llap = long(mem_per_thread_for_llap)
+ putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap)
+ Logger.info("LLAP config 'hive.tez.container.size' updated. Current: {0}".format(mem_per_thread_for_llap))
- num_executors_per_node_max = self.get_max_executors_per_node(yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap)
- if num_executors_per_node_max < 1:
- Logger.warning("Calculated 'Max. Executors per Node' = {0}. Expected values >= 1.".format(num_executors_per_node_max))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
+ putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb)
+ if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]:
+ if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY":
+ putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800)
+ Logger.info("'Tez for Hive2' config 'tez.runtime.io.sort.mb' updated. Current: {0}".format(tez_runtime_io_sort_mb))
- # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
- num_executors_per_node = min(math.floor(llap_daemon_mem_per_node / mem_per_thread_for_llap), num_executors_per_node_max)
- if num_executors_per_node <= 0:
- Logger.warning("Calculated 'Number of Executors Per Node' = {0}. Expected value >= 1".format(num_executors_per_node))
- self.recommendDefaultLlapConfiguration(configurations, services, hosts)
- return
+ putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size)
+ Logger.info("'Tez for Hive2' config 'tez.runtime.unordered.output.buffer.size-mb' updated. Current: {0}".format(tez_runtime_unordered_output_buffer_size))
- # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
- total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
- cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
+ putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size)
+ Logger.info("HIVE2 config 'hive.auto.convert.join.noconditionaltask.size' updated. Current: {0}".format(hive_auto_convert_join_noconditionaltask_size))
- tez_runtime_io_sort_mb = (long((0.8 * mem_per_thread_for_llap) / 3))
- tez_runtime_unordered_output_buffer_size = long(0.8 * 0.075 * mem_per_thread_for_llap)
- # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
- hive_auto_convert_join_noconditionaltask_size = (long((0.8 * mem_per_thread_for_llap) / 3)) * MB_TO_BYTES
- # Calculate value for prop 'llap_heap_size'
- llap_xmx = max(total_mem_for_executors_per_node * 0.8, total_mem_for_executors_per_node - self.get_llap_headroom_space(services, configurations))
+ num_executors_per_node = long(num_executors_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", int(num_executors_per_node_max))
+ Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}, Min: 1, "
+ "Max: {1}".format(num_executors_per_node, int(num_executors_per_node_max)))
+ # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
+ # 'hive.llap.daemon.num.executors' at all times.
+ putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node)
+ Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node))
- # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
- hive_server_interactive_heapsize = None
- hive_server_interactive_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts)
- if hive_server_interactive_hosts is None:
- # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
- hive_server_interactive_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
- if hive_server_interactive_hosts is not None and len(hive_server_interactive_hosts) > 0:
- host_mem = long(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
- hive_server_interactive_heapsize = min(max(2048.0, 400.0*llap_concurrency), 3.0/8 * host_mem)
+ cache_mem_per_node = long(cache_mem_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node)
+ Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_mem_per_node))
+ llap_io_enabled = 'false'
+ if cache_mem_per_node >= 64:
+ llap_io_enabled = 'true'
- # Done with calculations, updating calculated configs.
+ if hive_server_interactive_heapsize != None:
+ putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize))
+ Logger.info("Hive2 config 'hive_heapsize' updated. Current : {0}".format(int(hive_server_interactive_heapsize)))
- normalized_tez_am_container_size = long(normalized_tez_am_container_size)
- putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size)
+ putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled)
+ Logger.info("Hive2 config 'hive.llap.io.enabled' updated to '{0}' as part of "
+ "'hive.llap.io.memory.size' calculation.".format(llap_io_enabled))
- if not llap_concurrency_in_changed_configs:
- min_llap_concurrency = 1
- putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency)
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum",
- min_llap_concurrency)
+ llap_xmx = long(llap_xmx)
+ putHiveInteractiveEnvProperty('llap_heap_size', llap_xmx)
+ Logger.info("LLAP config 'llap_heap_size' updated. Current: {0}".format(llap_xmx))
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", max_llap_concurreny)
+ slider_am_container_size = long(slider_am_container_size)
+ putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size)
+ Logger.info("LLAP config 'slider_am_container_mb' updated. Current: {0}".format(slider_am_container_size))
- num_llap_nodes = long(num_llap_nodes)
- putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
- putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
+ except Exception as e:
+ # Set default values, if caught an Exception. The 'llap queue capacity' is left untouched, as it can be increased,
+ # triggerring recalculation.
+ Logger.info(e.message+" Skipping calculating LLAP configs. Setting them to minimum values.")
+ traceback.print_exc()
- llap_container_size = long(llap_daemon_mem_per_node)
- putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size)
+ try:
+ yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
+ slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size))
- # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
- # Else, we don't (1). Override the previous calculated value or (2). User provided value.
- if self.get_hive_tez_container_size(services) == self.CONFIG_VALUE_UINITIALIZED:
- mem_per_thread_for_llap = long(mem_per_thread_for_llap)
- putHiveInteractiveSiteProperty('hive.tez.container.size', mem_per_thread_for_llap)
+ node_manager_host_list = self.get_node_manager_hosts(services, hosts)
+ node_manager_cnt = len(node_manager_host_list)
- putTezInteractiveSiteProperty('tez.runtime.io.sort.mb', tez_runtime_io_sort_mb)
- if "tez-site" in services["configurations"] and "tez.runtime.sorter.class" in services["configurations"]["tez-site"]["properties"]:
- if services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"] == "LEGACY":
- putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", "maximum", 1800)
+ putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1)
+ putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1)
- putTezInteractiveSiteProperty('tez.runtime.unordered.output.buffer.size-mb', tez_runtime_unordered_output_buffer_size)
- putHiveInteractiveSiteProperty('hive.auto.convert.join.noconditionaltask.size', hive_auto_convert_join_noconditionaltask_size)
+ putHiveInteractiveEnvProperty('num_llap_nodes', 0)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
+ putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
- num_executors_per_node = long(num_executors_per_node)
- putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node)
- putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
- putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "maximum", float(num_executors_per_node_max))
+ putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size)
- # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
- # 'hive.llap.daemon.num.executors' at all times.
- cache_mem_per_node = long(cache_mem_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0)
+ putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1)
- putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node)
- putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_mem_per_node)
+ putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0)
- if hive_server_interactive_heapsize is not None:
- putHiveInteractiveEnvProperty("hive_heapsize", int(hive_server_interactive_heapsize))
+ putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0)
- llap_io_enabled = 'true' if long(cache_mem_per_node) >= 64 else 'false'
- putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled)
+ putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0)
- putHiveInteractiveEnvProperty('llap_heap_size', long(llap_xmx))
- putHiveInteractiveEnvProperty('slider_am_container_mb', long(slider_am_container_size))
+ putHiveInteractiveEnvProperty('llap_heap_size', 0)
- def recommendDefaultLlapConfiguration(self, configurations, services, hosts):
- putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services)
- putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE)
+ putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size)
- putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services)
- putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env")
+ except Exception as e:
+ Logger.info("Problem setting minimum values for LLAP configs in Exception code.")
+ traceback.print_exc()
- yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations))
- slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size))
-
- node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
- node_manager_cnt = len(node_manager_host_list)
-
- putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1)
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1)
- putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 1)
- putHiveInteractiveEnvProperty('num_llap_nodes', 0)
- putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1)
- putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt)
- putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size)
- putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size)
- putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0)
- putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors
<TRUNCATED>
[3/3] ambari git commit: Revert "AMBARI-19171 Configuring YARN with
custom queues leads to misleading errors in Stack Advisor (dgrinenko via
dsen)"
Posted by sm...@apache.org.
Revert "AMBARI-19171 Configuring YARN with custom queues leads to misleading errors in Stack Advisor (dgrinenko via dsen)"
This reverts commit df52e537c11c60363dc80bef778c5016d5b811de.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/82e44131
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/82e44131
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/82e44131
Branch: refs/heads/branch-2.5
Commit: 82e44131a242f2e3902f174c183098e46d7aabc3
Parents: 7024e4e
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Fri Dec 30 13:53:45 2016 -0800
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Fri Dec 30 13:53:45 2016 -0800
----------------------------------------------------------------------
.../stacks/HDP/2.5/services/stack_advisor.py | 1447 ++++++++++--------
.../src/main/resources/stacks/stack_advisor.py | 57 +-
.../mixins/common/configs/enhanced_configs.js | 3 +
3 files changed, 822 insertions(+), 685 deletions(-)
----------------------------------------------------------------------