You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2017/06/28 00:24:08 UTC

[10/51] [partial] ambari git commit: AMBARI-21349. Create BigInsights Stack Skeleton in Ambari 2.5 (alejandro)

http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
new file mode 100755
index 0000000..be49bf8
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_21.py
@@ -0,0 +1,259 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from resource_management.core.logger import Logger
+
+try:
+  from stack_advisor_206 import *
+except ImportError:
+  #Ignore ImportError
+  print("stack_advisor_206 not found")
+
+class HDP21StackAdvisor(HDP206StackAdvisor):
+
+  def getServiceConfigurationRecommenderDict(self):
+    parentRecommendConfDict = super(HDP21StackAdvisor, self).getServiceConfigurationRecommenderDict()
+    childRecommendConfDict = {
+      "OOZIE": self.recommendOozieConfigurations,
+      "HIVE": self.recommendHiveConfigurations,
+      "TEZ": self.recommendTezConfigurations
+    }
+    parentRecommendConfDict.update(childRecommendConfDict)
+    return parentRecommendConfDict
+
+  def recommendOozieConfigurations(self, configurations, clusterData, services, hosts):
+    oozieSiteProperties = getSiteProperties(services['configurations'], 'oozie-site')
+    oozieEnvProperties = getSiteProperties(services['configurations'], 'oozie-env')
+    putOozieProperty = self.putProperty(configurations, "oozie-site", services)
+    putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services)
+
+    if "FALCON_SERVER" in clusterData["components"]:
+      putOozieSiteProperty = self.putProperty(configurations, "oozie-site", services)
+      falconUser = None
+      if "falcon-env" in services["configurations"] and "falcon_user" in services["configurations"]["falcon-env"]["properties"]:
+        falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"]
+        if falconUser is not None:
+          putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser) , "*")
+          putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser) , "*")
+        falconUserOldValue = getOldValue(self, services, "falcon-env", "falcon_user")
+        if falconUserOldValue is not None:
+          if 'forced-configurations' not in services:
+            services["forced-configurations"] = []
+          putOozieSitePropertyAttribute = self.putPropertyAttribute(configurations, "oozie-site")
+          putOozieSitePropertyAttribute("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUserOldValue), 'delete', 'true')
+          putOozieSitePropertyAttribute("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUserOldValue), 'delete', 'true')
+          services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUserOldValue)})
+          services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUserOldValue)})
+          if falconUser is not None:
+            services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser)})
+            services["forced-configurations"].append({"type" : "oozie-site", "name" : "oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser)})
+
+      putMapredProperty = self.putProperty(configurations, "oozie-site")
+      putMapredProperty("oozie.services.ext",
+                        "org.apache.oozie.service.JMSAccessorService," +
+                        "org.apache.oozie.service.PartitionDependencyManagerService," +
+                        "org.apache.oozie.service.HCatAccessorService")
+    if oozieEnvProperties and oozieSiteProperties and self.checkSiteProperties(oozieSiteProperties, 'oozie.service.JPAService.jdbc.driver') and self.checkSiteProperties(oozieEnvProperties, 'oozie_database'):
+      putOozieProperty('oozie.service.JPAService.jdbc.driver', self.getDBDriver(oozieEnvProperties['oozie_database']))
+    if oozieSiteProperties and oozieEnvProperties and self.checkSiteProperties(oozieSiteProperties, 'oozie.db.schema.name', 'oozie.service.JPAService.jdbc.url') and self.checkSiteProperties(oozieEnvProperties, 'oozie_database'):
+      oozieServerHost = self.getHostWithComponent('OOZIE', 'OOZIE_SERVER', services, hosts)
+      oozieDBConnectionURL = oozieSiteProperties['oozie.service.JPAService.jdbc.url']
+      protocol = self.getProtocol(oozieEnvProperties['oozie_database'])
+      oldSchemaName = getOldValue(self, services, "oozie-site", "oozie.db.schema.name")
+      # under these if constructions we are checking if oozie server hostname available,
+      # if schema name was changed or if protocol according to current db type differs with protocol in db connection url(db type was changed)
+      if oozieServerHost is not None:
+        if oldSchemaName or (protocol and oozieDBConnectionURL and not oozieDBConnectionURL.startswith(protocol)):
+          dbConnection = self.getDBConnectionString(oozieEnvProperties['oozie_database']).format(oozieServerHost['Hosts']['host_name'], oozieSiteProperties['oozie.db.schema.name'])
+          putOozieProperty('oozie.service.JPAService.jdbc.url', dbConnection)
+
+  def recommendHiveConfigurations(self, configurations, clusterData, services, hosts):
+    hiveSiteProperties = getSiteProperties(services['configurations'], 'hive-site')
+    hiveEnvProperties = getSiteProperties(services['configurations'], 'hive-env')
+    containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory'])
+    containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize)
+    container_size_bytes = int(containerSize)*1024*1024
+    putHiveEnvProperty = self.putProperty(configurations, "hive-env", services)
+    putHiveProperty = self.putProperty(configurations, "hive-site", services)
+    
+    servicesList = self.get_services_list(services)
+    if "TEZ" in servicesList:
+        putHiveProperty('hive.auto.convert.join.noconditionaltask.size', int(round(container_size_bytes / 3)))
+        putHiveProperty('hive.tez.java.opts', "-server -Xmx" + str(int(round((0.8 * containerSize) + 0.5)))
+                    + "m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps")
+        putHiveProperty('hive.tez.container.size', containerSize)
+
+    # javax.jdo.option.ConnectionURL recommendations
+    if hiveEnvProperties and self.checkSiteProperties(hiveEnvProperties, 'hive_database', 'hive_database_type'):
+      putHiveEnvProperty('hive_database_type', self.getDBTypeAlias(hiveEnvProperties['hive_database']))
+    if hiveEnvProperties and hiveSiteProperties and self.checkSiteProperties(hiveSiteProperties, 'javax.jdo.option.ConnectionDriverName') and self.checkSiteProperties(hiveEnvProperties, 'hive_database'):
+      putHiveProperty('javax.jdo.option.ConnectionDriverName', self.getDBDriver(hiveEnvProperties['hive_database']))
+    if hiveSiteProperties and hiveEnvProperties and self.checkSiteProperties(hiveSiteProperties, 'ambari.hive.db.schema.name', 'javax.jdo.option.ConnectionURL') and self.checkSiteProperties(hiveEnvProperties, 'hive_database'):
+      hiveServerHost = self.getHostWithComponent('HIVE', 'HIVE_SERVER', services, hosts)
+      hiveDBConnectionURL = hiveSiteProperties['javax.jdo.option.ConnectionURL']
+      protocol = self.getProtocol(hiveEnvProperties['hive_database'])
+      oldSchemaName = getOldValue(self, services, "hive-site", "ambari.hive.db.schema.name")
+      oldDBType = getOldValue(self, services, "hive-env", "hive_database")
+      # under these if constructions we are checking if hive server hostname available,
+      # if it's default db connection url with "localhost" or if schema name was changed or if db type was changed (only for db type change from default mysql to existing mysql)
+      # or if protocol according to current db type differs with protocol in db connection url(other db types changes)
+      if hiveServerHost is not None:
+        if (hiveDBConnectionURL and "//localhost" in hiveDBConnectionURL) or oldSchemaName or oldDBType or (protocol and hiveDBConnectionURL and not hiveDBConnectionURL.startswith(protocol)):
+          dbConnection = self.getDBConnectionString(hiveEnvProperties['hive_database']).format(hiveServerHost['Hosts']['host_name'], hiveSiteProperties['ambari.hive.db.schema.name'])
+          putHiveProperty('javax.jdo.option.ConnectionURL', dbConnection)
+
+    if "PIG" in servicesList:
+        ambari_user = self.getAmbariUser(services)
+        ambariHostName = socket.getfqdn()
+        webHcatSiteProperty = self.putProperty(configurations, "webhcat-site", services)
+        webHcatSiteProperty("webhcat.proxyuser.{0}.hosts".format(ambari_user), ambariHostName)
+        webHcatSiteProperty("webhcat.proxyuser.{0}.groups".format(ambari_user), "*")
+        old_ambari_user = self.getOldAmbariUser(services)
+        if old_ambari_user is not None:
+            webHcatSitePropertyAttributes = self.putPropertyAttribute(configurations, "webhcat-site")
+            webHcatSitePropertyAttributes("webhcat.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
+            webHcatSitePropertyAttributes("webhcat.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
+
+    if self.is_secured_cluster(services):
+      appendCoreSiteProperty = self.updateProperty(configurations, "core-site", services)
+
+      def updateCallback(originalValue, newValue):
+        """
+        :type originalValue str
+        :type newValue list
+        """
+        if originalValue and not originalValue.isspace():
+          hosts = originalValue.split(',')
+
+          if newValue:
+            hosts.extend(newValue)
+
+          result = ','.join(set(hosts))
+          return result
+        else:
+          return ','.join(set(newValue))
+
+      meta = self.get_service_component_meta("HIVE", "WEBHCAT_SERVER", services)
+      if "hostnames" in meta:
+        appendCoreSiteProperty('hadoop.proxyuser.HTTP.hosts', meta["hostnames"], updateCallback)
+
+  def recommendTezConfigurations(self, configurations, clusterData, services, hosts):
+    putTezProperty = self.putProperty(configurations, "tez-site")
+    putTezProperty("tez.am.resource.memory.mb", int(clusterData['amMemory']))
+    putTezProperty("tez.am.java.opts",
+                   "-server -Xmx" + str(int(0.8 * clusterData["amMemory"]))
+                   + "m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC")
+    recommended_tez_queue = self.recommendYarnQueue(services, "tez-site", "tez.queue.name")
+    if recommended_tez_queue is not None:
+      putTezProperty("tez.queue.name", recommended_tez_queue)
+
+
+  def getNotPreferableOnServerComponents(self):
+    return ['STORM_UI_SERVER', 'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS', 'GANGLIA_SERVER', 'METRICS_COLLECTOR']
+
+  def getNotValuableComponents(self):
+    return ['JOURNALNODE', 'ZKFC', 'GANGLIA_MONITOR', 'APP_TIMELINE_SERVER']
+
+  def getComponentLayoutSchemes(self):
+    parentSchemes = super(HDP21StackAdvisor, self).getComponentLayoutSchemes()
+    childSchemes = {
+        'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+        'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+    }
+    parentSchemes.update(childSchemes)
+    return parentSchemes
+
+  def getServiceConfigurationValidators(self):
+    parentValidators = super(HDP21StackAdvisor, self).getServiceConfigurationValidators()
+    childValidators = {
+      "HIVE": {"hive-site": self.validateHiveConfigurations},
+      "TEZ": {"tez-site": self.validateTezConfigurations}
+    }
+    self.mergeValidators(parentValidators, childValidators)
+    return parentValidators
+
+  def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+    validationItems = [ 
+                        #{"config-name": 'hive.tez.container.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.tez.container.size')},
+                        #{"config-name": 'hive.tez.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'hive.tez.java.opts')},
+                        {"config-name": 'hive.auto.convert.join.noconditionaltask.size', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'hive.auto.convert.join.noconditionaltask.size')} ]
+    
+    servicesList = self.get_services_list(services)
+    if "TEZ" in servicesList:
+        yarnSiteProperties = getSiteProperties(configurations, "yarn-site")
+        if yarnSiteProperties:
+          yarnSchedulerMaximumAllocationMb = to_number(yarnSiteProperties["yarn.scheduler.maximum-allocation-mb"])
+          hiveTezContainerSize = to_number(properties['hive.tez.container.size'])
+          if hiveTezContainerSize is not None and yarnSchedulerMaximumAllocationMb is not None and hiveTezContainerSize > yarnSchedulerMaximumAllocationMb:
+            validationItems.append({"config-name": 'hive.tez.container.size', "item": self.getWarnItem("hive.tez.container.size is greater than the maximum container size specified in yarn.scheduler.maximum-allocation-mb")})
+    return self.toConfigurationValidationProblems(validationItems, "hive-site")
+
+  def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
+    validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')},
+                        {"config-name": 'tez.am.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'tez.am.java.opts')},
+                        {"config-name": 'tez.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'tez.queue.name', services)} ]
+    return self.toConfigurationValidationProblems(validationItems, "tez-site")
+
+  def getDBDriver(self, databaseType):
+    driverDict = {
+      'NEW MYSQL DATABASE': 'com.mysql.jdbc.Driver',
+      'NEW DERBY DATABASE': 'org.apache.derby.jdbc.EmbeddedDriver',
+      'EXISTING MYSQL DATABASE': 'com.mysql.jdbc.Driver',
+      'EXISTING MYSQL / MARIADB DATABASE': 'com.mysql.jdbc.Driver',
+      'EXISTING POSTGRESQL DATABASE': 'org.postgresql.Driver',
+      'EXISTING ORACLE DATABASE': 'oracle.jdbc.driver.OracleDriver',
+      'EXISTING SQL ANYWHERE DATABASE': 'sap.jdbc4.sqlanywhere.IDriver'
+    }
+    return driverDict.get(databaseType.upper())
+
+  def getDBConnectionString(self, databaseType):
+    driverDict = {
+      'NEW MYSQL DATABASE': 'jdbc:mysql://{0}/{1}?createDatabaseIfNotExist=true',
+      'NEW DERBY DATABASE': 'jdbc:derby:${{oozie.data.dir}}/${{oozie.db.schema.name}}-db;create=true',
+      'EXISTING MYSQL DATABASE': 'jdbc:mysql://{0}/{1}',
+      'EXISTING MYSQL / MARIADB DATABASE': 'jdbc:mysql://{0}/{1}',
+      'EXISTING POSTGRESQL DATABASE': 'jdbc:postgresql://{0}:5432/{1}',
+      'EXISTING ORACLE DATABASE': 'jdbc:oracle:thin:@//{0}:1521/{1}',
+      'EXISTING SQL ANYWHERE DATABASE': 'jdbc:sqlanywhere:host={0};database={1}'
+    }
+    return driverDict.get(databaseType.upper())
+
+  def getProtocol(self, databaseType):
+    first_parts_of_connection_string = {
+      'NEW MYSQL DATABASE': 'jdbc:mysql',
+      'NEW DERBY DATABASE': 'jdbc:derby',
+      'EXISTING MYSQL DATABASE': 'jdbc:mysql',
+      'EXISTING MYSQL / MARIADB DATABASE': 'jdbc:mysql',
+      'EXISTING POSTGRESQL DATABASE': 'jdbc:postgresql',
+      'EXISTING ORACLE DATABASE': 'jdbc:oracle',
+      'EXISTING SQL ANYWHERE DATABASE': 'jdbc:sqlanywhere'
+    }
+    return first_parts_of_connection_string.get(databaseType.upper())
+
+  def getDBTypeAlias(self, databaseType):
+    driverDict = {
+      'NEW MYSQL DATABASE': 'mysql',
+      'NEW DERBY DATABASE': 'derby',
+      'EXISTING MYSQL / MARIADB DATABASE': 'mysql',
+      'EXISTING MYSQL DATABASE': 'mysql',
+      'EXISTING POSTGRESQL DATABASE': 'postgres',
+      'EXISTING ORACLE DATABASE': 'oracle',
+      'EXISTING SQL ANYWHERE DATABASE': 'sqla'
+    }
+    return driverDict.get(databaseType.upper())