You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/06/28 00:24:08 UTC
[10/51] [partial] ambari git commit: AMBARI-21349. Create BigInsights
Stack Skeleton in Ambari 2.5 (alejandro)
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
new file mode 100755
index 0000000..be49bf8
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
@@ -0,0 +1,259 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from resource_management.core.logger import Logger
+
+try:
+ from stack_advisor_206 import *
+except ImportError:
+ #Ignore ImportError
+ print("stack_advisor_206 not found")
+
+class HDP21StackAdvisor(HDP206StackAdvisor):
+
+ def getServiceConfigurationRecommenderDict(self):
+ parentRecommendConfDict = super(HDP21StackAdvisor, self).getServiceConfigurationRecommenderDict()
+ childRecommendConfDict = {
+ "OOZIE": self.recommendOozieConfigurations,
+ "HIVE": self.recommendHiveConfigurations,
+ "TEZ": self.recommendTezConfigurations
+ }
+ parentRecommendConfDict.update(childRecommendConfDict)
+ return parentRecommendConfDict
+
+ def recommendOozieConfigurations(self, configurations, clusterData, services, hosts):
+ oozieSiteProperties = getSiteProperties(services['configurations'], 'oozie-site')
+ oozieEnvProperties = getSiteProperties(services['configurations'], 'oozie-env')
+ putOozieProperty = self.putProperty(configurations, "oozie-site", services)
+ putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services)
+
+ if "FALCON_SERVER" in clusterData["components"]:
+ putOozieSiteProperty = self.putProperty(configurations, "oozie-site", services)
+ falconUser = None
+ if "falcon-env" in services["configurations"] and "falcon_user" in services["configurations"]["falcon-env"]["properties"]:
+ falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"]
+ if falconUser is not None:
+ putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser) , "*")
+ putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser) , "*")
+ falconUserOldValue = getOldValue(self, services, "falcon-env", "falcon_user")
+ if falconUserOldValue is not None:
+ if 'forced-configurations' not in services:
+ services["forced-configurations"] = []
+ putOozieSitePropertyAttribute = self.putPropertyAttribute(configurations, "oozie-site")
+ putOozieSitePropertyAttribute("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUserOldValue), 'delete', 'true')
+ putOozieSitePropertyAttribute("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUserOldValue), 'delete', 'true')
+ services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUserOldValue)})
+ services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUserOldValue)})
+ if falconUser is not None:
+ services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser)})
+ services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser)})
+
+ putMapredProperty = self.putProperty(configurations, "oozie-site")
+ putMapredProperty("oozie.services.ext",
+ "org.apache.oozie.service.JMSAccessorService," +
+ "org.apache.oozie.service.PartitionDependencyManagerService," +
+ "org.apache.oozie.service.HCatAccessorService")
+ if oozieEnvProperties and oozieSiteProperties and self.checkSiteProperties(oozieSiteProperties, 'oozie.service.JPAService.jdbc.driver') and self.checkSiteProperties(oozieEnvProperties, 'oozie_database'):
+ putOozieProperty('oozie.service.JPAService.jdbc.driver', self.getDBDriver(oozieEnvProperties['oozie_database']))
+ if oozieSiteProperties and oozieEnvProperties and self.checkSiteProperties(oozieSiteProperties, 'oozie.db.schema.name', 'oozie.service.JPAService.jdbc.url') and self.checkSiteProperties(oozieEnvProperties, 'oozie_database'):
+ oozieServerHost = self.getHostWithComponent('OOZIE', 'OOZIE_SERVER', services, hosts)
+ oozieDBConnectionURL = oozieSiteProperties['oozie.service.JPAService.jdbc.url']
+ protocol = self.getProtocol(oozieEnvProperties['oozie_database'])
+ oldSchemaName = getOldValue(self, services, "oozie-site", "oozie.db.schema.name")
+ # under these if constructions we are checking if oozie server hostname available,
+ # if schema name was changed or if protocol according to current db type differs with protocol in db connection url(db type was changed)
+ if oozieServerHost is not None:
+ if oldSchemaName or (protocol and oozieDBConnectionURL and not oozieDBConnectionURL.startswith(protocol)):
+ dbConnection = self.getDBConnectionString(oozieEnvProperties['oozie_database']).format(oozieServerHost['Hosts']['host_name'], oozieSiteProperties['oozie.db.schema.name'])
+ putOozieProperty('oozie.service.JPAService.jdbc.url', dbConnection)
+
+ def recommendHiveConfigurations(self, configurations, clusterData, services, hosts):
+ hiveSiteProperties = getSiteProperties(services['configurations'], 'hive-site')
+ hiveEnvProperties = getSiteProperties(services['configurations'], 'hive-env')
+ containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory'])
+ containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize)
+ container_size_bytes = int(containerSize)*1024*1024
+ putHiveEnvProperty = self.putProperty(configurations, "hive-env", services)
+ putHiveProperty = self.putProperty(configurations, "hive-site", services)
+
+ servicesList = self.get_services_list(services)
+ if "TEZ" in servicesList:
+ putHiveProperty('hive.auto.convert.join.noconditionaltask.size', int(round(container_size_bytes / 3)))
+ putHiveProperty('hive.tez.java.opts', "-server -Xmx" + str(int(round((0.8 * containerSize) + 0.5)))
+ + "m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps")
+ putHiveProperty('hive.tez.container.size', containerSize)
+
+ # javax.jdo.option.ConnectionURL recommendations
+ if hiveEnvProperties and self.checkSiteProperties(hiveEnvProperties, 'hive_database', 'hive_database_type'):
+ putHiveEnvProperty('hive_database_type', self.getDBTypeAlias(hiveEnvProperties['hive_database']))
+ if hiveEnvProperties and hiveSiteProperties and self.checkSiteProperties(hiveSiteProperties, 'javax.jdo.option.ConnectionDriverName') and self.checkSiteProperties(hiveEnvProperties, 'hive_database'):
+ putHiveProperty('javax.jdo.option.ConnectionDriverName', self.getDBDriver(hiveEnvProperties['hive_database']))
+ if hiveSiteProperties and hiveEnvProperties and self.checkSiteProperties(hiveSiteProperties, 'ambari.hive.db.schema.name', 'javax.jdo.option.ConnectionURL') and self.checkSiteProperties(hiveEnvProperties, 'hive_database'):
+ hiveServerHost = self.getHostWithComponent('HIVE', 'HIVE_SERVER', services, hosts)
+ hiveDBConnectionURL = hiveSiteProperties['javax.jdo.option.ConnectionURL']
+ protocol = self.getProtocol(hiveEnvProperties['hive_database'])
+ oldSchemaName = getOldValue(self, services, "hive-site", "ambari.hive.db.schema.name")
+ oldDBType = getOldValue(self, services, "hive-env", "hive_database")
+ # under these if constructions we are checking if hive server hostname available,
+ # if it's default db connection url with "localhost" or if schema name was changed or if db type was changed (only for db type change from default mysql to existing mysql)
+ # or if protocol according to current db type differs with protocol in db connection url(other db types changes)
+ if hiveServerHost is not None:
+ if (hiveDBConnectionURL and "//localhost" in hiveDBConnectionURL) or oldSchemaName or oldDBType or (protocol and hiveDBConnectionURL and not hiveDBConnectionURL.startswith(protocol)):
+ dbConnection = self.getDBConnectionString(hiveEnvProperties['hive_database']).format(hiveServerHost['Hosts']['host_name'], hiveSiteProperties['ambari.hive.db.schema.name'])
+ putHiveProperty('javax.jdo.option.ConnectionURL', dbConnection)
+
+ if "PIG" in servicesList:
+ ambari_user = self.getAmbariUser(services)
+ ambariHostName = socket.getfqdn()
+ webHcatSiteProperty = self.putProperty(configurations, "webhcat-site", services)
+ webHcatSiteProperty("webhcat.proxyuser.{0}.hosts".format(ambari_user), ambariHostName)
+ webHcatSiteProperty("webhcat.proxyuser.{0}.groups".format(ambari_user), "*")
+ old_ambari_user = self.getOldAmbariUser(services)
+ if old_ambari_user is not None:
+ webHcatSitePropertyAttributes = self.putPropertyAttribute(configurations, "webhcat-site")
+ webHcatSitePropertyAttributes("webhcat.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+ webHcatSitePropertyAttributes("webhcat.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+ if self.is_secured_cluster(services):
+ appendCoreSiteProperty = self.updateProperty(configurations, "core-site", services)
+
+ def updateCallback(originalValue, newValue):
+ """
+ :type originalValue str
+ :type newValue list
+ """
+ if originalValue and not originalValue.isspace():
+ hosts = originalValue.split(',')
+
+ if newValue:
+ hosts.extend(newValue)
+
+ result = ','.join(set(hosts))
+ return result
+ else:
+ return ','.join(set(newValue))
+
+ meta = self.get_service_component_meta("HIVE", "WEBHCAT_SERVER", services)
+ if "hostnames" in meta:
+ appendCoreSiteProperty('hadoop.proxyuser.HTTP.hosts', meta["hostnames"], updateCallback)
+
+ def recommendTezConfigurations(self, configurations, clusterData, services, hosts):
+ putTezProperty = self.putProperty(configurations, "tez-site")
+ putTezProperty("tez.am.resource.memory.mb", int(clusterData['amMemory']))
+ putTezProperty("tez.am.java.opts",
+ "-server -Xmx" + str(int(0.8 * clusterData["amMemory"]))
+ + "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC")
+ recommended_tez_queue = self.recommendYarnQueue(services, "tez-site", "tez.queue.name")
+ if recommended_tez_queue is not None:
+ putTezProperty("tez.queue.name", recommended_tez_queue)
+
+
+ def getNotPreferableOnServerComponents(self):
+ return ['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS', 'GANGLIA_SERVER', 'METRICS_COLLECTOR']
+
+ def getNotValuableComponents(self):
+ return ['JOURNALNODE', 'ZKFC', 'GANGLIA_MONITOR', 'APP_TIMELINE_SERVER']
+
+ def getComponentLayoutSchemes(self):
+ parentSchemes = super(HDP21StackAdvisor, self).getComponentLayoutSchemes()
+ childSchemes = {
+ 'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+ 'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+ }
+ parentSchemes.update(childSchemes)
+ return parentSchemes
+
+ def getServiceConfigurationValidators(self):
+ parentValidators = super(HDP21StackAdvisor, self).getServiceConfigurationValidators()
+ childValidators = {
+ "HIVE": {"hive-site": self.validateHiveConfigurations},
+ "TEZ": {"tez-site": self.validateTezConfigurations}
+ }
+ self.mergeValidators(parentValidators, childValidators)
+ return parentValidators
+
+ def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+ validationItems = [
+ #{"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')},
+ #{"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')},
+ {"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ]
+
+ servicesList = self.get_services_list(services)
+ if "TEZ" in servicesList:
+ yarnSiteProperties = getSiteProperties(configurations, "yarn-site")
+ if yarnSiteProperties:
+ yarnSchedulerMaximumAllocationMb = to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
+ hiveTezContainerSize = to_number(properties['hive.tez.container.size'])
+ if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb:
+ validationItems.append({"config-name": 'hive.tez.container.size', "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")})
+ return self.toConfigurationValidationProblems(validationItems, "hive-site")
+
+ def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+ validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')},
+ {"config-name": 'tez.am.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'tez.am.java.opts')},
+ {"config-name": 'tez.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'tez.queue.name', services)} ]
+ return self.toConfigurationValidationProblems(validationItems, "tez-site")
+
+ def getDBDriver(self, databaseType):
+ driverDict = {
+ 'NEW MYSQL DATABASE': 'com.mysql.jdbc.Driver',
+ 'NEW DERBY DATABASE': 'org.apache.derby.jdbc.EmbeddedDriver',
+ 'EXISTING MYSQL DATABASE': 'com.mysql.jdbc.Driver',
+ 'EXISTING MYSQL / MARIADB DATABASE': 'com.mysql.jdbc.Driver',
+ 'EXISTING POSTGRESQL DATABASE': 'org.postgresql.Driver',
+ 'EXISTING ORACLE DATABASE': 'oracle.jdbc.driver.OracleDriver',
+ 'EXISTING SQL ANYWHERE DATABASE': 'sap.jdbc4.sqlanywhere.IDriver'
+ }
+ return driverDict.get(databaseType.upper())
+
+ def getDBConnectionString(self, databaseType):
+ driverDict = {
+ 'NEW MYSQL DATABASE': 'jdbc:mysql://{0}/{1}?createDatabaseIfNotExist=true',
+ 'NEW DERBY DATABASE': 'jdbc:derby:${{oozie.data.dir}}/${{oozie.db.schema.name}}-db;create=true',
+ 'EXISTING MYSQL DATABASE': 'jdbc:mysql://{0}/{1}',
+ 'EXISTING MYSQL / MARIADB DATABASE': 'jdbc:mysql://{0}/{1}',
+ 'EXISTING POSTGRESQL DATABASE': 'jdbc:postgresql://{0}:5432/{1}',
+ 'EXISTING ORACLE DATABASE': 'jdbc:oracle:thin:@//{0}:1521/{1}',
+ 'EXISTING SQL ANYWHERE DATABASE': 'jdbc:sqlanywhere:host={0};database={1}'
+ }
+ return driverDict.get(databaseType.upper())
+
+ def getProtocol(self, databaseType):
+ first_parts_of_connection_string = {
+ 'NEW MYSQL DATABASE': 'jdbc:mysql',
+ 'NEW DERBY DATABASE': 'jdbc:derby',
+ 'EXISTING MYSQL DATABASE': 'jdbc:mysql',
+ 'EXISTING MYSQL / MARIADB DATABASE': 'jdbc:mysql',
+ 'EXISTING POSTGRESQL DATABASE': 'jdbc:postgresql',
+ 'EXISTING ORACLE DATABASE': 'jdbc:oracle',
+ 'EXISTING SQL ANYWHERE DATABASE': 'jdbc:sqlanywhere'
+ }
+ return first_parts_of_connection_string.get(databaseType.upper())
+
+ def getDBTypeAlias(self, databaseType):
+ driverDict = {
+ 'NEW MYSQL DATABASE': 'mysql',
+ 'NEW DERBY DATABASE': 'derby',
+ 'EXISTING MYSQL / MARIADB DATABASE': 'mysql',
+ 'EXISTING MYSQL DATABASE': 'mysql',
+ 'EXISTING POSTGRESQL DATABASE': 'postgres',
+ 'EXISTING ORACLE DATABASE': 'oracle',
+ 'EXISTING SQL ANYWHERE DATABASE': 'sqla'
+ }
+ return driverDict.get(databaseType.upper())