You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2017/10/06 07:41:43 UTC
[2/2] ambari git commit: AMBARI-22124. Refactor AMS logic in stack
advisors to service advisors.(vbrodetskyi)
AMBARI-22124. Refactor AMS logic in stack advisors to service advisors.(vbrodetskyi)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0f32765d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0f32765d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0f32765d
Branch: refs/heads/trunk
Commit: 0f32765dc2250044c7925f4e68e6f61b7a77d8f8
Parents: 9adfcdc
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Fri Oct 6 10:40:33 2017 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Fri Oct 6 10:40:33 2017 +0300
----------------------------------------------------------------------
.../AMBARI_METRICS/0.1.0/service_advisor.py | 787 +++++++++++++++++++
.../ATLAS/0.7.0.3.0/service_advisor.py | 5 +-
.../stacks/HDP/2.0.6/services/stack_advisor.py | 542 +------------
.../stacks/HDP/2.2/services/stack_advisor.py | 1 -
.../AMBARI_METRICS/test_service_advisor.py | 596 ++++++++++++++
.../stacks/2.0.6/common/test_stack_advisor.py | 576 --------------
.../stacks/2.2/common/test_stack_advisor.py | 511 ------------
7 files changed, 1388 insertions(+), 1630 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py
new file mode 100644
index 0000000..eae98bf
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py
@@ -0,0 +1,787 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import re
+import os
+import sys
+import socket
+import traceback
+from math import ceil, floor, log
+
+
+from resource_management.core.logger import Logger
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+#split points
+metricsDir = os.path.join(SCRIPT_DIR, 'package')
+print "METRICS_DIR=>" + str(metricsDir)
+serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics')
+customServiceMetricsDir = os.path.join(SCRIPT_DIR, '../../../dashboards/service-metrics')
+sys.path.append(os.path.join(metricsDir, 'scripts'))
+
+from split_points import FindSplitPointsForAMSRegions
+
+try:
+ with open(PARENT_FILE, 'rb') as fp:
+ service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+ traceback.print_exc()
+ print "Failed to load parent"
+
+class AMBARI_METRICSServiceAdvisor(service_advisor.ServiceAdvisor):
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AMBARI_METRICSServiceAdvisor, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ # Always call these methods
+ self.modifyMastersWithMultipleInstances()
+ self.modifyCardinalitiesDict()
+ self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
+
+ def modifyMastersWithMultipleInstances(self):
+ """
+ Modify the set of masters with multiple instances.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyCardinalitiesDict(self):
+ """
+ Modify the dictionary of cardinalities.
+ Must be overriden in child class.
+ """
+ min_val = 1
+
+ self.cardinalitiesDict.update(
+ {
+ 'METRICS_COLLECTOR': {"min": min_val}
+ }
+ )
+
+ def modifyHeapSizeProperties(self):
+ """
+ Modify the dictionary of heap size properties.
+ Must be overriden in child class.
+ """
+ self.heap_size_properties = {"METRICS_COLLECTOR":
+ [{"config-name": "ams-hbase-env",
+ "property": "hbase_master_heapsize",
+ "default": "1024m"},
+ {"config-name": "ams-hbase-env",
+ "property": "hbase_regionserver_heapsize",
+ "default": "1024m"},
+ {"config-name": "ams-env",
+ "property": "metrics_collector_heapsize",
+ "default": "512m"}]}
+
+
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ self.notValuableComponents |= set(['METRICS_MONITOR'])
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ self.notPreferableOnServerComponents |= set(['METRICS_COLLECTOR'])
+
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ self.componentLayoutSchemes.update({'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}})
+
+
+ def getServiceComponentLayoutValidations(self, services, hosts):
+ """
+ Get a list of errors.
+ Must be overriden in child class.
+ """
+
+ return []
+
+ def getAmsMemoryRecommendation(self, services, hosts):
+ # MB per sink in hbase heapsize
+ HEAP_PER_MASTER_COMPONENT = 50
+ HEAP_PER_SLAVE_COMPONENT = 10
+
+ schMemoryMap = {
+ "HDFS": {
+ "NAMENODE": HEAP_PER_MASTER_COMPONENT,
+ "SECONDARY_NAMENODE": HEAP_PER_MASTER_COMPONENT,
+ "DATANODE": HEAP_PER_SLAVE_COMPONENT
+ },
+ "YARN": {
+ "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT,
+ "NODEMANAGER": HEAP_PER_SLAVE_COMPONENT,
+ "HISTORYSERVER" : HEAP_PER_MASTER_COMPONENT,
+ "APP_TIMELINE_SERVER": HEAP_PER_MASTER_COMPONENT
+ },
+ "HBASE": {
+ "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT,
+ "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT
+ },
+ "HIVE": {
+ "HIVE_METASTORE": HEAP_PER_MASTER_COMPONENT,
+ "HIVE_SERVER": HEAP_PER_MASTER_COMPONENT
+ },
+ "KAFKA": {
+ "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT
+ },
+ "FLUME": {
+ "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT
+ },
+ "STORM": {
+ "NIMBUS": HEAP_PER_MASTER_COMPONENT,
+ },
+ "AMBARI_METRICS": {
+ "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT,
+ "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT
+ },
+ "ACCUMULO": {
+ "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT,
+ "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT
+ },
+ "LOGSEARCH": {
+ "LOGSEARCH_LOGFEEDER" : HEAP_PER_SLAVE_COMPONENT
+ }
+ }
+ total_sinks_count = 0
+ # minimum heap size
+ hbase_heapsize = 500
+ for serviceName, componentsDict in schMemoryMap.items():
+ for componentName, multiplier in componentsDict.items():
+ schCount = len(
+ self.getHostsWithComponent(serviceName, componentName, services,
+ hosts))
+ hbase_heapsize += int((schCount * multiplier))
+ total_sinks_count += schCount
+ collector_heapsize = int(hbase_heapsize/3 if hbase_heapsize > 2048 else 512)
+ hbase_heapsize = min(hbase_heapsize, 32768)
+
+ return self.round_to_n(collector_heapsize), self.round_to_n(hbase_heapsize), total_sinks_count
+
+
+ def round_to_n(self, mem_size, n=128):
+ return int(round(float(mem_size) / float(n))) * int(n)
+
+
+ def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+ """
+ Entry point.
+ Must be overriden in child class.
+ """
+ #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ # (self.__class__.__name__, inspect.stack()[0][3]))
+
+ recommender = AMBARI_METRICSRecommender()
+ recommender.recommendAmsConfigurationsFromHDP206(configurations, clusterData, services, hosts)
+
+
+
+
+ def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+ """
+ Entry point.
+ Validate configurations for the service. Return a list of errors.
+ The code for this function should be the same for each Service Advisor.
+ """
+ #Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ # (self.__class__.__name__, inspect.stack()[0][3]))
+
+ validator = self.getAMBARI_METRICSValidator()
+ # Calls the methods of the validator using arguments,
+ # method(siteProperties, siteRecommendations, configurations, services, hosts)
+ return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+ def getAMBARI_METRICSValidator(self):
+ return AMBARI_METRICSValidator()
+
+class AMBARI_METRICSRecommender(service_advisor.ServiceAdvisor):
+ """
+ AMS Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AMBARI_METRICSRecommender, self)
+ self.as_super.__init__(*args, **kwargs)
+
+
+
+ def getPreferredMountPoints(self, hostInfo):
+
+ # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points
+ undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts",
+ "/etc/hostname", "/tmp"]
+ undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"]
+ mountPoints = []
+ if hostInfo and "disk_info" in hostInfo:
+ mountPointsDict = {}
+ for mountpoint in hostInfo["disk_info"]:
+ if not (mountpoint["mountpoint"] in undesirableMountPoints or
+ mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
+ mountpoint["type"] in undesirableFsTypes or
+ mountpoint["available"] == str(0)):
+ mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
+ if mountPointsDict:
+ mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
+ mountPoints.append("/")
+ return mountPoints
+
+ def recommendAmsConfigurationsFromHDP206(self, configurations, clusterData, services, hosts):
+ putAmsEnvProperty = self.putProperty(configurations, "ams-env", services)
+ putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services)
+ putAmsSiteProperty = self.putProperty(configurations, "ams-site", services)
+ putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services)
+ putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services)
+ putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env")
+
+ amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
+
+ serviceAdvisor = AMBARI_METRICSServiceAdvisor()
+
+ # TODO set "timeline.metrics.service.webapp.address" to 0.0.0.0:port in upgrade catalog
+ timeline_metrics_service_webapp_address = '0.0.0.0'
+
+ putAmsSiteProperty("timeline.metrics.service.webapp.address", str(timeline_metrics_service_webapp_address) + ":6188")
+
+ log_dir = "/var/log/ambari-metrics-collector"
+ if "ams-env" in services["configurations"]:
+ if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]:
+ log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"]
+ putHbaseEnvProperty("hbase_log_dir", log_dir)
+
+ defaultFs = 'file:///'
+ if "core-site" in services["configurations"] and \
+ "fs.defaultFS" in services["configurations"]["core-site"]["properties"]:
+ defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"]
+
+ operatingMode = "embedded"
+ if "ams-site" in services["configurations"]:
+ if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]:
+ operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"]
+
+ if len(amsCollectorHosts) > 1 :
+ operatingMode = "distributed"
+ putAmsSiteProperty("timeline.metrics.service.operation.mode", operatingMode)
+
+ if operatingMode == "distributed":
+ putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true')
+ putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true')
+ else:
+ putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false')
+ putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false')
+
+ rootDir = "file:///var/lib/ambari-metrics-collector/hbase"
+ tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp"
+ zk_port_default = []
+ if "ams-hbase-site" in services["configurations"]:
+ if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]:
+ rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"]
+ if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]:
+ tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"]
+ if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]:
+ zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"]
+
+ # Skip recommendation item if default value is present
+ if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default:
+ zkPort = self.getZKPort(services)
+ putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort)
+ elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default:
+ putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181")
+
+ mountpoints = ["/"]
+ for collectorHostName in amsCollectorHosts:
+ for host in hosts["items"]:
+ if host["Hosts"]["host_name"] == collectorHostName:
+ mountpoints = self.getPreferredMountPoints(host["Hosts"])
+ break
+ isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/"))
+ if isLocalRootDir:
+ rootDir = re.sub("^file:///|/", "", rootDir, count=1)
+ rootDir = "file://" + os.path.join(mountpoints[0], rootDir)
+ tmpDir = re.sub("^file:///|/", "", tmpDir, count=1)
+ if len(mountpoints) > 1 and isLocalRootDir:
+ tmpDir = os.path.join(mountpoints[1], tmpDir)
+ else:
+ tmpDir = os.path.join(mountpoints[0], tmpDir)
+ putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir)
+
+ if operatingMode == "distributed":
+ putAmsHbaseSiteProperty("hbase.rootdir", "/user/ams/hbase")
+
+ if operatingMode == "embedded":
+ if isLocalRootDir:
+ putAmsHbaseSiteProperty("hbase.rootdir", rootDir)
+ else:
+ putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase")
+
+ collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
+
+ putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize)
+
+ putAmsSiteProperty("timeline.metrics.cache.size", max(100, int(log(total_sinks_count)) * 100))
+ putAmsSiteProperty("timeline.metrics.cache.commit.interval", min(10, max(12 - int(log(total_sinks_count)), 2)))
+
+ # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25
+ putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3)
+ putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728)
+ putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35)
+ putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3)
+
+ if len(amsCollectorHosts) > 1:
+ pass
+ else:
+ # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3
+ if total_sinks_count >= 2000:
+ putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
+ putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
+ putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
+ putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
+ putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3)
+ putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25)
+ putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20)
+ putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000)
+ putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30)
+ putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000)
+ elif total_sinks_count >= 1000:
+ putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
+ putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
+ putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
+ putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
+ putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000)
+ putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000)
+ else:
+ putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000)
+ pass
+
+ metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100)))
+ putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers)
+
+ serviceAdvisor = AMBARI_METRICSServiceAdvisor()
+
+ # Distributed mode heap size
+ if operatingMode == "distributed":
+ hbase_heapsize = max(hbase_heapsize, 1024)
+ putHbaseEnvProperty("hbase_master_heapsize", "512")
+ putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size
+ putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize)
+ putHbaseEnvProperty("regionserver_xmn_size", serviceAdvisor.round_to_n(0.15 * hbase_heapsize,64))
+ else:
+ # Embedded mode heap size : master + regionserver
+ hbase_rs_heapsize = 512
+ putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize)
+ putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize)
+ putHbaseEnvProperty("hbase_master_xmn_size", serviceAdvisor.round_to_n(0.15*(hbase_heapsize + hbase_rs_heapsize),64))
+
+ # If no local DN in distributed mode
+ if operatingMode == "distributed":
+ dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
+ # call by Kerberos wizard sends only the service being affected
+ # so it is possible for dn_hosts to be None but not amsCollectorHosts
+ if dn_hosts and len(dn_hosts) > 0:
+ if set(amsCollectorHosts).intersection(dn_hosts):
+ collector_cohosted_with_dn = "true"
+ else:
+ collector_cohosted_with_dn = "false"
+ putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn)
+
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+
+ ams_hbase_site = None
+ ams_hbase_env = None
+
+ # Overriden properties form the UI
+ if "ams-hbase-site" in services["configurations"]:
+ ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"]
+ if "ams-hbase-env" in services["configurations"]:
+ ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"]
+
+ # Recommendations
+ if not ams_hbase_site:
+ ams_hbase_site = configurations["ams-hbase-site"]["properties"]
+ if not ams_hbase_env:
+ ams_hbase_env = configurations["ams-hbase-env"]["properties"]
+
+ split_point_finder = FindSplitPointsForAMSRegions(
+ ams_hbase_site, ams_hbase_env, serviceMetricsDir, customServiceMetricsDir, operatingMode, servicesList)
+
+ result = split_point_finder.get_split_points()
+ precision_splits = ' '
+ aggregate_splits = ' '
+ if result.precision:
+ precision_splits = result.precision
+ if result.aggregate:
+ aggregate_splits = result.aggregate
+ putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits))
+ putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits))
+
+ component_grafana_exists = False
+ for service in services['services']:
+ if 'components' in service:
+ for component in service['components']:
+ if 'StackServiceComponents' in component:
+ # If Grafana is installed the hostnames would indicate its location
+ if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\
+ len(component['StackServiceComponents']['hostnames']) != 0:
+ component_grafana_exists = True
+ break
+ pass
+
+ if not component_grafana_exists:
+ putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false")
+
+ pass
+
+
+
+class AMBARI_METRICSValidator(service_advisor.ServiceAdvisor):
+ """
+ AMS Validator checks the correctness of properties whenever the service is first added or the user attempts to
+ change configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AMBARI_METRICSValidator, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ self.validators = [("ams-hbase-site", self.validateAmsHbaseSiteConfigurationsFromHDP206),
+ ("ams-hbase-env", self.validateAmsHbaseEnvConfigurationsFromHDP206),
+ ("ams-site", self.validateAmsSiteConfigurationsFromHDP206),
+ ("ams-env", self.validateAmsEnvConfigurationsFromHDP206),
+ ("ams-grafana-env", self.validateGrafanaEnvConfigurationsFromHDP206)]
+
+
+
+ def getPreferredMountPoints(self, hostInfo):
+
+ # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points
+ undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts",
+ "/etc/hostname", "/tmp"]
+ undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"]
+ mountPoints = []
+ if hostInfo and "disk_info" in hostInfo:
+ mountPointsDict = {}
+ for mountpoint in hostInfo["disk_info"]:
+ if not (mountpoint["mountpoint"] in undesirableMountPoints or
+ mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or
+ mountpoint["type"] in undesirableFsTypes or
+ mountpoint["available"] == str(0)):
+ mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"])
+ if mountPointsDict:
+ mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True)
+ mountPoints.append("/")
+ return mountPoints
+
+ def validateAmsHbaseSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+
+ amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
+ ams_site = self.getSiteProperties(configurations, "ams-site")
+ core_site = self.getSiteProperties(configurations, "core-site")
+
+ serviceAdvisor = AMBARI_METRICSServiceAdvisor()
+
+ collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
+ recommendedDiskSpace = 10485760
+ # TODO validate configuration for multiple AMBARI_METRICS collectors
+ if len(amsCollectorHosts) > 1:
+ pass
+ else:
+ if total_sinks_count > 2000:
+ recommendedDiskSpace = 104857600 # * 1k == 100 Gb
+ elif total_sinks_count > 500:
+ recommendedDiskSpace = 52428800 # * 1k == 50 Gb
+ elif total_sinks_count > 250:
+ recommendedDiskSpace = 20971520 # * 1k == 20 Gb
+
+ validationItems = []
+
+ rootdir_item = None
+ op_mode = ams_site.get("timeline.metrics.service.operation.mode")
+ default_fs = core_site.get("fs.defaultFS") if core_site else "file:///"
+ hbase_rootdir = properties.get("hbase.rootdir")
+ hbase_tmpdir = properties.get("hbase.tmp.dir")
+ distributed = properties.get("hbase.cluster.distributed")
+ is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/"))
+
+ if op_mode == "distributed" and is_local_root_dir:
+ rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.")
+ elif op_mode == "embedded":
+ if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"):
+ rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, "
+ "Example - file:// for localFS")
+ pass
+
+ distributed_item = None
+ if op_mode == "distributed" and not distributed.lower() == "true":
+ distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for "
+ "distributed mode")
+ if op_mode == "embedded" and distributed.lower() == "true":
+ distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode")
+
+ hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort")
+ zkPort = self.getZKPort(services)
+ hbase_zk_client_port_item = None
+ if distributed.lower() == "true" and op_mode == "distributed" and \
+ hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
+ hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort "
+ "should be the cluster zookeeper server port : {0}".format(zkPort))
+
+ if distributed.lower() == "false" and op_mode == "embedded" and \
+ hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
+ hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort "
+ "should be a different port than cluster zookeeper port."
+ "(default:61181)")
+
+ validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item },
+ {"config-name":'hbase.cluster.distributed', "item": distributed_item },
+ {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }])
+
+ for collectorHostName in amsCollectorHosts:
+ for host in hosts["items"]:
+ if host["Hosts"]["host_name"] == collectorHostName:
+ if op_mode == 'embedded' or is_local_root_dir:
+ validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}])
+ validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}])
+ validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}])
+
+ dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
+ if is_local_root_dir:
+ mountPoints = []
+ for mountPoint in host["Hosts"]["disk_info"]:
+ mountPoints.append(mountPoint["mountpoint"])
+ hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints)
+ hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints)
+ preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
+ # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
+ # if multiple preferred_mountpoints exist
+ if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \
+ len(preferred_mountpoints) > 1:
+ item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. "
+ "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint))
+ validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}])
+
+ # if METRICS_COLLECTOR is co-hosted with DATANODE
+ # cross-check dfs.datanode.data.dir and hbase.rootdir
+ # they shouldn't share same disk partition IO
+ hdfs_site = self.getSiteProperties(configurations, "hdfs-site")
+ dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else []
+ if dn_hosts and collectorHostName in dn_hosts and ams_site and \
+ dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
+ for dfs_datadir in dfs_datadirs:
+ dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints)
+ if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
+ item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
+ "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
+ validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}])
+ break
+ # If no local DN in distributed mode
+ elif collectorHostName not in dn_hosts and distributed.lower() == "true":
+ item = self.getWarnItem("It's recommended to install Datanode component on {0} "
+ "to speed up IO operations between HDFS and Metrics "
+ "Collector in distributed mode ".format(collectorHostName))
+ validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}])
+ # Short circuit read should be enabled in distibuted mode
+ # if local DN installed
+ else:
+ validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}])
+
+ return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")
+
+
+ def validateAmsHbaseEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+
+ ams_env = self.getSiteProperties(configurations, "ams-env")
+ amsHbaseSite = self.getSiteProperties(configurations, "ams-hbase-site")
+ validationItems = []
+ mb = 1024 * 1024
+ gb = 1024 * mb
+
+ regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added
+ if regionServerItem:
+ validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
+
+ hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize")
+ if hbaseMasterHeapsizeItem:
+ validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
+
+ logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir")
+ if logDirItem:
+ validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}])
+
+ hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"])
+ hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"])
+ hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"])
+ hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"])
+
+ # Validate Xmn settings.
+ masterXmnItem = None
+ regionServerXmnItem = None
+ is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true'
+
+ if is_hbase_distributed:
+
+ if not regionServerItem and hbase_regionserver_heapsize > 32768:
+ regionServerItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
+ validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
+
+ minMasterXmn = 0.12 * hbase_master_heapsize
+ maxMasterXmn = 0.2 * hbase_master_heapsize
+ if hbase_master_xmn_size < minMasterXmn:
+ masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
+ "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn))))
+
+ if hbase_master_xmn_size > maxMasterXmn:
+ masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
+ "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn))))
+
+ minRegionServerXmn = 0.12 * hbase_regionserver_heapsize
+ maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize
+ if hbase_regionserver_xmn_size < minRegionServerXmn:
+ regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
+ "(12% of hbase_regionserver_heapsize)"
+ .format(int(ceil(minRegionServerXmn))))
+
+ if hbase_regionserver_xmn_size > maxRegionServerXmn:
+ regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
+ "(20% of hbase_regionserver_heapsize)"
+ .format(int(floor(maxRegionServerXmn))))
+ else:
+
+ if not hbaseMasterHeapsizeItem and (hbase_master_heapsize + hbase_regionserver_heapsize) > 32768:
+ hbaseMasterHeapsizeItem = self.getWarnItem("Value of Master + Regionserver heapsize is more than the recommended maximum heap size of 32G.")
+ validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
+
+ minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize)
+ maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize)
+ if hbase_master_xmn_size < minMasterXmn:
+ masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
+ "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)"
+ .format(int(ceil(minMasterXmn))))
+
+ if hbase_master_xmn_size > maxMasterXmn:
+ masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
+ "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)"
+ .format(int(floor(maxMasterXmn))))
+ if masterXmnItem:
+ validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}])
+
+ if regionServerXmnItem:
+ validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}])
+
+ if hbaseMasterHeapsizeItem is None:
+ hostMasterComponents = {}
+
+ for service in services["services"]:
+ for component in service["components"]:
+ if component["StackServiceComponents"]["hostnames"] is not None:
+ for hostName in component["StackServiceComponents"]["hostnames"]:
+ if self.isMasterComponent(component):
+ if hostName not in hostMasterComponents.keys():
+ hostMasterComponents[hostName] = []
+ hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"])
+
+ amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
+ for collectorHostName in amsCollectorHosts:
+ for host in hosts["items"]:
+ if host["Hosts"]["host_name"] == collectorHostName:
+ # AMS Collector co-hosted with other master components in bigger clusters
+ if len(hosts['items']) > 31 and \
+ len(hostMasterComponents[collectorHostName]) > 2 and \
+ host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k)
+ masterHostMessage = "Host {0} is used by multiple master components ({1}). " \
+ "It is recommended to use a separate host for the " \
+ "Ambari Metrics Collector component and ensure " \
+ "the host has sufficient memory available."
+
+ hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format(
+ collectorHostName, str(", ".join(hostMasterComponents[collectorHostName]))))
+ if hbaseMasterHeapsizeItem:
+ validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
+ pass
+
+ return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env")
+
+
+ def validateAmsSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+ validationItems = []
+
+ serviceAdvisor = AMBARI_METRICSServiceAdvisor()
+
+ op_mode = properties.get("timeline.metrics.service.operation.mode")
+ correct_op_mode_item = None
+ if op_mode not in ("embedded", "distributed"):
+ correct_op_mode_item = self.getErrorItem("Correct value should be set.")
+ pass
+ elif len(self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")) > 1 and op_mode != 'distributed':
+ correct_op_mode_item = self.getErrorItem("Correct value should be 'distributed' for clusters with more then 1 Metrics collector")
+ elif op_mode == 'embedded':
+ collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts)
+ if total_sinks_count > 1000:
+ correct_op_mode_item = self.getWarnItem("Number of sinks writing metrics to collector is expected to be more than 1000. "
+ "'Embedded' mode AMS might not be able to handle the load. Consider moving to distributed mode.")
+
+ validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }])
+ return self.toConfigurationValidationProblems(validationItems, "ams-site")
+
+
+ def validateAmsEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+
+ validationItems = []
+ collectorHeapsizeDefaultItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "metrics_collector_heapsize")
+ validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeDefaultItem}])
+
+ ams_env = self.getSiteProperties(configurations, "ams-env")
+ collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize"))
+ if collector_heapsize > 32768:
+ collectorHeapsizeMaxItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
+ validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeMaxItem}])
+
+ return self.toConfigurationValidationProblems(validationItems, "ams-env")
+
+
+ def validateGrafanaEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts):
+ validationItems = []
+
+ grafana_pwd = properties.get("metrics_grafana_password")
+ grafana_pwd_length_item = None
+ if len(grafana_pwd) < 4:
+ grafana_pwd_length_item = self.getErrorItem("Grafana password length should be at least 4.")
+ pass
+ validationItems.extend([{"config-name":'metrics_grafana_password', "item": grafana_pwd_length_item }])
+ return self.toConfigurationValidationProblems(validationItems, "ams-site")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
index a2e31cc..058e086 100644
--- a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
+++ b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
@@ -74,7 +74,10 @@ class AtlasServiceAdvisor(service_advisor.ServiceAdvisor):
Modify the dictionary of heap size properties.
Must be overriden in child class.
"""
- pass
+ self.heap_size_properties = {"ATLAS_SERVER":
+ [{"config-name": "atlas-env",
+ "property": "atlas_server_xmx",
+ "default": "2048m"}]}
def modifyNotValuableComponents(self):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 5307176..a194332 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -172,7 +172,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"HDFS": self.recommendHDFSConfigurations,
"HBASE": self.recommendHbaseConfigurations,
"STORM": self.recommendStormConfigurations,
- "AMBARI_METRICS": self.recommendAmsConfigurations,
"RANGER": self.recommendRangerConfigurations,
"ZOOKEEPER": self.recommendZookeeperConfigurations,
"OOZIE": self.recommendOozieConfigurations
@@ -509,66 +508,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
putRangerAuditProperty(item['target_configname'], rangerAuditProperty)
- def getAmsMemoryRecommendation(self, services, hosts):
- # MB per sink in hbase heapsize
- HEAP_PER_MASTER_COMPONENT = 50
- HEAP_PER_SLAVE_COMPONENT = 10
-
- schMemoryMap = {
- "HDFS": {
- "NAMENODE": HEAP_PER_MASTER_COMPONENT,
- "SECONDARY_NAMENODE": HEAP_PER_MASTER_COMPONENT,
- "DATANODE": HEAP_PER_SLAVE_COMPONENT
- },
- "YARN": {
- "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT,
- "NODEMANAGER": HEAP_PER_SLAVE_COMPONENT,
- "HISTORYSERVER" : HEAP_PER_MASTER_COMPONENT,
- "APP_TIMELINE_SERVER": HEAP_PER_MASTER_COMPONENT
- },
- "HBASE": {
- "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT,
- "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT
- },
- "HIVE": {
- "HIVE_METASTORE": HEAP_PER_MASTER_COMPONENT,
- "HIVE_SERVER": HEAP_PER_MASTER_COMPONENT
- },
- "KAFKA": {
- "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT
- },
- "FLUME": {
- "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT
- },
- "STORM": {
- "NIMBUS": HEAP_PER_MASTER_COMPONENT,
- },
- "AMBARI_METRICS": {
- "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT,
- "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT
- },
- "ACCUMULO": {
- "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT,
- "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT
- },
- "LOGSEARCH": {
- "LOGSEARCH_LOGFEEDER" : HEAP_PER_SLAVE_COMPONENT
- }
- }
- total_sinks_count = 0
- # minimum heap size
- hbase_heapsize = 500
- for serviceName, componentsDict in schMemoryMap.items():
- for componentName, multiplier in componentsDict.items():
- schCount = len(
- self.getHostsWithComponent(serviceName, componentName, services,
- hosts))
- hbase_heapsize += int((schCount * multiplier))
- total_sinks_count += schCount
- collector_heapsize = int(hbase_heapsize/3 if hbase_heapsize > 2048 else 512)
- hbase_heapsize = min(hbase_heapsize, 32768)
-
- return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count
def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
putStormSiteProperty = self.putProperty(configurations, "storm-site", services)
@@ -577,216 +516,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
if 'AMBARI_METRICS' in servicesList:
putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter')
- def recommendAmsConfigurations(self, configurations, clusterData, services, hosts):
- putAmsEnvProperty = self.putProperty(configurations, "ams-env", services)
- putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services)
- putAmsSiteProperty = self.putProperty(configurations, "ams-site", services)
- putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services)
- putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services)
- putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env")
-
- amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
-
- # TODO set "timeline.metrics.service.webapp.address" to 0.0.0.0:port in upgrade catalog
- timeline_metrics_service_webapp_address = '0.0.0.0'
-
- putAmsSiteProperty("timeline.metrics.service.webapp.address", str(timeline_metrics_service_webapp_address) + ":6188")
-
- log_dir = "/var/log/ambari-metrics-collector"
- if "ams-env" in services["configurations"]:
- if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]:
- log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"]
- putHbaseEnvProperty("hbase_log_dir", log_dir)
-
- defaultFs = 'file:///'
- if "core-site" in services["configurations"] and \
- "fs.defaultFS" in services["configurations"]["core-site"]["properties"]:
- defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"]
-
- operatingMode = "embedded"
- if "ams-site" in services["configurations"]:
- if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]:
- operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"]
-
- if len(amsCollectorHosts) > 1 :
- operatingMode = "distributed"
- putAmsSiteProperty("timeline.metrics.service.operation.mode", operatingMode)
-
- if operatingMode == "distributed":
- putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true')
- putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true')
- else:
- putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false')
- putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false')
-
- rootDir = "file:///var/lib/ambari-metrics-collector/hbase"
- tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp"
- zk_port_default = []
- if "ams-hbase-site" in services["configurations"]:
- if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]:
- rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"]
- if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]:
- tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"]
- if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]:
- zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"]
-
- # Skip recommendation item if default value is present
- if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default:
- zkPort = self.getZKPort(services)
- putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort)
- elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default:
- putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181")
-
- mountpoints = ["/"]
- for collectorHostName in amsCollectorHosts:
- for host in hosts["items"]:
- if host["Hosts"]["host_name"] == collectorHostName:
- mountpoints = self.getPreferredMountPoints(host["Hosts"])
- break
- isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/"))
- if isLocalRootDir:
- rootDir = re.sub("^file:///|/", "", rootDir, count=1)
- rootDir = "file://" + os.path.join(mountpoints[0], rootDir)
- tmpDir = re.sub("^file:///|/", "", tmpDir, count=1)
- if len(mountpoints) > 1 and isLocalRootDir:
- tmpDir = os.path.join(mountpoints[1], tmpDir)
- else:
- tmpDir = os.path.join(mountpoints[0], tmpDir)
- putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir)
-
- if operatingMode == "distributed":
- putAmsHbaseSiteProperty("hbase.rootdir", "/user/ams/hbase")
-
- if operatingMode == "embedded":
- if isLocalRootDir:
- putAmsHbaseSiteProperty("hbase.rootdir", rootDir)
- else:
- putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase")
-
- collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts)
-
- putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize)
-
- putAmsSiteProperty("timeline.metrics.cache.size", max(100, int(log(total_sinks_count)) * 100))
- putAmsSiteProperty("timeline.metrics.cache.commit.interval", min(10, max(12 - int(log(total_sinks_count)), 2)))
-
- # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25
- putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3)
- putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728)
- putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35)
- putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3)
-
- if len(amsCollectorHosts) > 1:
- pass
- else:
- # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3
- if total_sinks_count >= 2000:
- putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
- putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
- putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
- putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
- putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3)
- putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25)
- putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20)
- putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000)
- putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30)
- putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000)
- elif total_sinks_count >= 1000:
- putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
- putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
- putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
- putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
- putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000)
- putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000)
- else:
- putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000)
- pass
-
- metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100)))
- putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers)
-
- # Distributed mode heap size
- if operatingMode == "distributed":
- hbase_heapsize = max(hbase_heapsize, 1024)
- putHbaseEnvProperty("hbase_master_heapsize", "512")
- putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size
- putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize)
- putHbaseEnvProperty("regionserver_xmn_size", round_to_n(0.15 * hbase_heapsize,64))
- else:
- # Embedded mode heap size : master + regionserver
- hbase_rs_heapsize = 512
- putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize)
- putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize)
- putHbaseEnvProperty("hbase_master_xmn_size", round_to_n(0.15*(hbase_heapsize + hbase_rs_heapsize),64))
-
- # If no local DN in distributed mode
- if operatingMode == "distributed":
- dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
- # call by Kerberos wizard sends only the service being affected
- # so it is possible for dn_hosts to be None but not amsCollectorHosts
- if dn_hosts and len(dn_hosts) > 0:
- if set(amsCollectorHosts).intersection(dn_hosts):
- collector_cohosted_with_dn = "true"
- else:
- collector_cohosted_with_dn = "false"
- putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn)
-
- #split points
- scriptDir = os.path.dirname(os.path.abspath(__file__))
- metricsDir = os.path.join(scriptDir, '../../../../common-services/AMBARI_METRICS/0.1.0/package')
- serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics')
- customServiceMetricsDir = os.path.join(scriptDir, '../../../../dashboards/service-metrics')
- sys.path.append(os.path.join(metricsDir, 'scripts'))
- servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
-
- from split_points import FindSplitPointsForAMSRegions
-
- ams_hbase_site = None
- ams_hbase_env = None
-
- # Overriden properties form the UI
- if "ams-hbase-site" in services["configurations"]:
- ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"]
- if "ams-hbase-env" in services["configurations"]:
- ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"]
-
- # Recommendations
- if not ams_hbase_site:
- ams_hbase_site = configurations["ams-hbase-site"]["properties"]
- if not ams_hbase_env:
- ams_hbase_env = configurations["ams-hbase-env"]["properties"]
-
- split_point_finder = FindSplitPointsForAMSRegions(
- ams_hbase_site, ams_hbase_env, serviceMetricsDir, customServiceMetricsDir, operatingMode, servicesList)
-
- result = split_point_finder.get_split_points()
- precision_splits = ' '
- aggregate_splits = ' '
- if result.precision:
- precision_splits = result.precision
- if result.aggregate:
- aggregate_splits = result.aggregate
- putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits))
- putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits))
-
- component_grafana_exists = False
- for service in services['services']:
- if 'components' in service:
- for component in service['components']:
- if 'StackServiceComponents' in component:
- # If Grafana is installed the hostnames would indicate its location
- if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\
- len(component['StackServiceComponents']['hostnames']) != 0:
- component_grafana_exists = True
- break
- pass
-
- if not component_grafana_exists:
- putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false")
-
- pass
-
-
def getServiceConfigurationValidators(self):
return {
@@ -797,12 +526,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
"YARN": {"yarn-site": self.validateYARNConfigurations,
"yarn-env": self.validateYARNEnvConfigurations},
"HBASE": {"hbase-env": self.validateHbaseEnvConfigurations},
- "STORM": {"storm-site": self.validateStormConfigurations},
- "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations,
- "ams-hbase-env": self.validateAmsHbaseEnvConfigurations,
- "ams-site": self.validateAmsSiteConfigurations,
- "ams-env": self.validateAmsEnvConfigurations,
- "ams-grafana-env": self.validateGrafanaEnvConfigurations}
+ "STORM": {"storm-site": self.validateStormConfigurations}
}
def validateMinMax(self, items, recommendedDefaults, configurations):
@@ -834,148 +558,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
items.extend(self.toConfigurationValidationProblems(validationItems, configName))
pass
- def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
- validationItems = []
- op_mode = properties.get("timeline.metrics.service.operation.mode")
- correct_op_mode_item = None
- if op_mode not in ("embedded", "distributed"):
- correct_op_mode_item = self.getErrorItem("Correct value should be set.")
- pass
- elif len(self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")) > 1 and op_mode != 'distributed':
- correct_op_mode_item = self.getErrorItem("Correct value should be 'distributed' for clusters with more then 1 Metrics collector")
- elif op_mode == 'embedded':
- collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts)
- if total_sinks_count > 1000:
- correct_op_mode_item = self.getWarnItem("Number of sinks writing metrics to collector is expected to be more than 1000. "
- "'Embedded' mode AMS might not be able to handle the load. Consider moving to distributed mode.")
-
- validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }])
- return self.toConfigurationValidationProblems(validationItems, "ams-site")
-
- def validateGrafanaEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
- validationItems = []
-
- grafana_pwd = properties.get("metrics_grafana_password")
- grafana_pwd_length_item = None
- if len(grafana_pwd) < 4:
- grafana_pwd_length_item = self.getErrorItem("Grafana password length should be at least 4.")
- pass
- validationItems.extend([{"config-name":'metrics_grafana_password', "item": grafana_pwd_length_item }])
- return self.toConfigurationValidationProblems(validationItems, "ams-site")
-
- def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
-
- amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
- ams_site = getSiteProperties(configurations, "ams-site")
- core_site = getSiteProperties(configurations, "core-site")
-
- collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts)
- recommendedDiskSpace = 10485760
- # TODO validate configuration for multiple AMBARI_METRICS collectors
- if len(amsCollectorHosts) > 1:
- pass
- else:
- if total_sinks_count > 2000:
- recommendedDiskSpace = 104857600 # * 1k == 100 Gb
- elif total_sinks_count > 500:
- recommendedDiskSpace = 52428800 # * 1k == 50 Gb
- elif total_sinks_count > 250:
- recommendedDiskSpace = 20971520 # * 1k == 20 Gb
-
- validationItems = []
-
- rootdir_item = None
- op_mode = ams_site.get("timeline.metrics.service.operation.mode")
- default_fs = core_site.get("fs.defaultFS") if core_site else "file:///"
- hbase_rootdir = properties.get("hbase.rootdir")
- hbase_tmpdir = properties.get("hbase.tmp.dir")
- distributed = properties.get("hbase.cluster.distributed")
- is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/"))
-
- if op_mode == "distributed" and is_local_root_dir:
- rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.")
- elif op_mode == "embedded":
- if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"):
- rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, "
- "Example - file:// for localFS")
- pass
-
- distributed_item = None
- if op_mode == "distributed" and not distributed.lower() == "true":
- distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for "
- "distributed mode")
- if op_mode == "embedded" and distributed.lower() == "true":
- distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode")
-
- hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort")
- zkPort = self.getZKPort(services)
- hbase_zk_client_port_item = None
- if distributed.lower() == "true" and op_mode == "distributed" and \
- hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
- hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort "
- "should be the cluster zookeeper server port : {0}".format(zkPort))
-
- if distributed.lower() == "false" and op_mode == "embedded" and \
- hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}":
- hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort "
- "should be a different port than cluster zookeeper port."
- "(default:61181)")
-
- validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item },
- {"config-name":'hbase.cluster.distributed', "item": distributed_item },
- {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }])
-
- for collectorHostName in amsCollectorHosts:
- for host in hosts["items"]:
- if host["Hosts"]["host_name"] == collectorHostName:
- if op_mode == 'embedded' or is_local_root_dir:
- validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}])
- validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}])
- validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}])
-
- dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE")
- if is_local_root_dir:
- mountPoints = []
- for mountPoint in host["Hosts"]["disk_info"]:
- mountPoints.append(mountPoint["mountpoint"])
- hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints)
- hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints)
- preferred_mountpoints = self.getPreferredMountPoints(host['Hosts'])
- # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition
- # if multiple preferred_mountpoints exist
- if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \
- len(preferred_mountpoints) > 1:
- item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. "
- "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint))
- validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}])
-
- # if METRICS_COLLECTOR is co-hosted with DATANODE
- # cross-check dfs.datanode.data.dir and hbase.rootdir
- # they shouldn't share same disk partition IO
- hdfs_site = getSiteProperties(configurations, "hdfs-site")
- dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else []
- if dn_hosts and collectorHostName in dn_hosts and ams_site and \
- dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs):
- for dfs_datadir in dfs_datadirs:
- dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints)
- if dfs_datadir_mountpoint == hbase_rootdir_mountpoint:
- item = self.getWarnItem("Consider not using {0} partition for storing metrics data. "
- "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint))
- validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}])
- break
- # If no local DN in distributed mode
- elif collectorHostName not in dn_hosts and distributed.lower() == "true":
- item = self.getWarnItem("It's recommended to install Datanode component on {0} "
- "to speed up IO operations between HDFS and Metrics "
- "Collector in distributed mode ".format(collectorHostName))
- validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}])
- # Short circuit read should be enabled in distibuted mode
- # if local DN installed
- else:
- validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}])
-
- return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site")
def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
validationItems = []
@@ -990,132 +573,9 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
return self.toConfigurationValidationProblems(validationItems, "storm-site")
- def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
-
- ams_env = getSiteProperties(configurations, "ams-env")
- amsHbaseSite = getSiteProperties(configurations, "ams-hbase-site")
- validationItems = []
- mb = 1024 * 1024
- gb = 1024 * mb
-
- regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added
- if regionServerItem:
- validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
-
- hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize")
- if hbaseMasterHeapsizeItem:
- validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
-
- logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir")
- if logDirItem:
- validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}])
-
- hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"])
- hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"])
- hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"])
- hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"])
-
- # Validate Xmn settings.
- masterXmnItem = None
- regionServerXmnItem = None
- is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true'
-
- if is_hbase_distributed:
-
- if not regionServerItem and hbase_regionserver_heapsize > 32768:
- regionServerItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
- validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}])
-
- minMasterXmn = 0.12 * hbase_master_heapsize
- maxMasterXmn = 0.2 * hbase_master_heapsize
- if hbase_master_xmn_size < minMasterXmn:
- masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
- "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn))))
-
- if hbase_master_xmn_size > maxMasterXmn:
- masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
- "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn))))
-
- minRegionServerXmn = 0.12 * hbase_regionserver_heapsize
- maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize
- if hbase_regionserver_xmn_size < minRegionServerXmn:
- regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
- "(12% of hbase_regionserver_heapsize)"
- .format(int(ceil(minRegionServerXmn))))
-
- if hbase_regionserver_xmn_size > maxRegionServerXmn:
- regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
- "(20% of hbase_regionserver_heapsize)"
- .format(int(floor(maxRegionServerXmn))))
- else:
-
- if not hbaseMasterHeapsizeItem and (hbase_master_heapsize + hbase_regionserver_heapsize) > 32768:
- hbaseMasterHeapsizeItem = self.getWarnItem("Value of Master + Regionserver heapsize is more than the recommended maximum heap size of 32G.")
- validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
-
- minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize)
- maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize)
- if hbase_master_xmn_size < minMasterXmn:
- masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} "
- "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)"
- .format(int(ceil(minMasterXmn))))
-
- if hbase_master_xmn_size > maxMasterXmn:
- masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} "
- "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)"
- .format(int(floor(maxMasterXmn))))
- if masterXmnItem:
- validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}])
-
- if regionServerXmnItem:
- validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}])
-
- if hbaseMasterHeapsizeItem is None:
- hostMasterComponents = {}
-
- for service in services["services"]:
- for component in service["components"]:
- if component["StackServiceComponents"]["hostnames"] is not None:
- for hostName in component["StackServiceComponents"]["hostnames"]:
- if self.isMasterComponent(component):
- if hostName not in hostMasterComponents.keys():
- hostMasterComponents[hostName] = []
- hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"])
-
- amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")
- for collectorHostName in amsCollectorHosts:
- for host in hosts["items"]:
- if host["Hosts"]["host_name"] == collectorHostName:
- # AMS Collector co-hosted with other master components in bigger clusters
- if len(hosts['items']) > 31 and \
- len(hostMasterComponents[collectorHostName]) > 2 and \
- host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k)
- masterHostMessage = "Host {0} is used by multiple master components ({1}). " \
- "It is recommended to use a separate host for the " \
- "Ambari Metrics Collector component and ensure " \
- "the host has sufficient memory available."
-
- hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format(
- collectorHostName, str(", ".join(hostMasterComponents[collectorHostName]))))
- if hbaseMasterHeapsizeItem:
- validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}])
- pass
- return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env")
-
- def validateAmsEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
-
- validationItems = []
- collectorHeapsizeDefaultItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "metrics_collector_heapsize")
- validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeDefaultItem}])
- ams_env = getSiteProperties(configurations, "ams-env")
- collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize"))
- if collector_heapsize > 32768:
- collectorHeapsizeMaxItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.")
- validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeMaxItem}])
- return self.toConfigurationValidationProblems(validationItems, "ams-env")
def getMemorySizeRequired(self, services, components, configurations):
totalMemoryRequired = 512*1024*1024 # 512Mb for OS needs
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index 2dc1738..4cb0d9e 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -91,7 +91,6 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
"HBASE": self.recommendHBASEConfigurations,
"MAPREDUCE2": self.recommendMapReduce2Configurations,
"TEZ": self.recommendTezConfigurations,
- "AMBARI_METRICS": self.recommendAmsConfigurations,
"YARN": self.recommendYARNConfigurations,
"STORM": self.recommendStormConfigurations,
"KNOX": self.recommendKnoxConfigurations,