You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/05/10 13:37:31 UTC
[08/22] ambari git commit: AMBARI-20960. HDP 3.0 TP - create Service
Advisor for Atlas.(vbrodetskyi)
AMBARI-20960. HDP 3.0 TP - create Service Advisor for Atlas.(vbrodetskyi)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bae2b60
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bae2b60
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bae2b60
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 6bae2b607b5ed95beba55975d2ff5b23b7a3f090
Parents: 42a542a
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Mon May 8 23:47:24 2017 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Mon May 8 23:47:24 2017 +0300
----------------------------------------------------------------------
.../ATLAS/0.7.0.3.0/service_advisor.py | 441 +++++++++++++++++++
1 file changed, 441 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bae2b60/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
new file mode 100644
index 0000000..a2e31cc
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py
@@ -0,0 +1,441 @@
+#!/usr/bin/env ambari-python-wrap
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+# Python imports
+import imp
+import os
+import traceback
+import re
+import socket
+import fnmatch
+
+from ambari_commons.str_utils import string_set_equals
+from resource_management.core.logger import Logger
+
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
+
+try:
+ with open(PARENT_FILE, 'rb') as fp:
+ service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+ traceback.print_exc()
+ print "Failed to load parent"
+
+class AtlasServiceAdvisor(service_advisor.ServiceAdvisor):
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AtlasServiceAdvisor, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ # Always call these methods
+ self.modifyMastersWithMultipleInstances()
+ self.modifyCardinalitiesDict()
+ self.modifyHeapSizeProperties()
+ self.modifyNotValuableComponents()
+ self.modifyComponentsNotPreferableOnServer()
+ self.modifyComponentLayoutSchemes()
+
+ def modifyMastersWithMultipleInstances(self):
+ """
+ Modify the set of masters with multiple instances.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyCardinalitiesDict(self):
+ """
+ Modify the dictionary of cardinalities.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyHeapSizeProperties(self):
+ """
+ Modify the dictionary of heap size properties.
+ Must be overriden in child class.
+ """
+ pass
+
+ def modifyNotValuableComponents(self):
+ """
+ Modify the set of components whose host assignment is based on other services.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentsNotPreferableOnServer(self):
+ """
+ Modify the set of components that are not preferable on the server.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def modifyComponentLayoutSchemes(self):
+ """
+ Modify layout scheme dictionaries for components.
+ The scheme dictionary basically maps the number of hosts to
+ host index where component should exist.
+ Must be overriden in child class.
+ """
+ # Nothing to do
+ pass
+
+ def getServiceComponentLayoutValidations(self, services, hosts):
+ """
+ Get a list of errors.
+ Must be overriden in child class.
+ """
+
+ return []
+
+ def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+ """
+ Entry point.
+ Must be overriden in child class.
+ """
+ #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." %
+ # (self.__class__.__name__, inspect.stack()[0][3]))
+
+ recommender = AtlasRecommender()
+ recommender.recommendAtlasConfigurationsFromHDP25(configurations, clusterData, services, hosts)
+ recommender.recommendAtlasConfigurationsFromHDP26(configurations, clusterData, services, hosts)
+
+
+
+ def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+ """
+ Entry point.
+ Validate configurations for the service. Return a list of errors.
+ The code for this function should be the same for each Service Advisor.
+ """
+ #Logger.info("Class: %s, Method: %s. Validating Configurations." %
+ # (self.__class__.__name__, inspect.stack()[0][3]))
+
+ validator = AtlasValidator()
+ # Calls the methods of the validator using arguments,
+ # method(siteProperties, siteRecommendations, configurations, services, hosts)
+ return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+
+
+
+class AtlasRecommender(service_advisor.ServiceAdvisor):
+ """
+ Atlas Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AtlasRecommender, self)
+ self.as_super.__init__(*args, **kwargs)
+
+
+ def constructAtlasRestAddress(self, services, hosts):
+ """
+ :param services: Collection of services in the cluster with configs
+ :param hosts: Collection of hosts in the cluster
+ :return: The suggested property for atlas.rest.address if it is valid, otherwise, None
+ """
+ atlas_rest_address = None
+ services_list = [service["StackServices"]["service_name"] for service in services["services"]]
+ is_atlas_in_cluster = "ATLAS" in services_list
+
+ atlas_server_hosts_info = self.getHostsWithComponent("ATLAS", "ATLAS_SERVER", services, hosts)
+ if is_atlas_in_cluster and atlas_server_hosts_info and len(atlas_server_hosts_info) > 0:
+ # Multiple Atlas Servers can exist, so sort by hostname to create deterministic csv
+ atlas_host_names = [e['Hosts']['host_name'] for e in atlas_server_hosts_info]
+ if len(atlas_host_names) > 1:
+ atlas_host_names = sorted(atlas_host_names)
+
+ scheme = "http"
+ metadata_port = "21000"
+ atlas_server_default_https_port = "21443"
+ tls_enabled = "false"
+ if 'application-properties' in services['configurations']:
+ if 'atlas.enableTLS' in services['configurations']['application-properties']['properties']:
+ tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS']
+ if 'atlas.server.http.port' in services['configurations']['application-properties']['properties']:
+ metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.http.port'])
+
+ if str(tls_enabled).lower() == "true":
+ scheme = "https"
+ if 'atlas.server.https.port' in services['configurations']['application-properties']['properties']:
+ metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.https.port'])
+ else:
+ metadata_port = atlas_server_default_https_port
+
+ atlas_rest_address_list = ["{0}://{1}:{2}".format(scheme, hostname, metadata_port) for hostname in atlas_host_names]
+ atlas_rest_address = ",".join(atlas_rest_address_list)
+ self.logger.info("Constructing atlas.rest.address=%s" % atlas_rest_address)
+ return atlas_rest_address
+
+ def recommendAtlasConfigurationsFromHDP25(self, configurations, clusterData, services, hosts):
+ putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
+ putAtlasRangerPluginProperty = self.putProperty(configurations, "ranger-atlas-plugin-properties", services)
+ putAtlasEnvProperty = self.putProperty(configurations, "atlas-env", services)
+
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+
+ # Generate atlas.rest.address since the value is always computed
+ atlas_rest_address = self.constructAtlasRestAddress(services, hosts)
+ if atlas_rest_address is not None:
+ putAtlasApplicationProperty("atlas.rest.address", atlas_rest_address)
+
+ if "AMBARI_INFRA" in servicesList and 'infra-solr-env' in services['configurations']:
+ if 'infra_solr_znode' in services['configurations']['infra-solr-env']['properties']:
+ infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode']
+ else:
+ infra_solr_znode = None
+
+ zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services)
+ zookeeper_host_arr = []
+
+ zookeeper_port = self.getZKPort(services)
+ for i in range(len(zookeeper_hosts)):
+ zookeeper_host = zookeeper_hosts[i] + ':' + zookeeper_port
+ if infra_solr_znode is not None:
+ zookeeper_host += infra_solr_znode
+ zookeeper_host_arr.append(zookeeper_host)
+
+ solr_zookeeper_url = ",".join(zookeeper_host_arr)
+
+ putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', solr_zookeeper_url)
+ else:
+ putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', "")
+
+ # Kafka section
+ if "KAFKA" in servicesList and 'kafka-broker' in services['configurations']:
+ kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services)
+
+ if 'port' in services['configurations']['kafka-broker']['properties']:
+ kafka_broker_port = services['configurations']['kafka-broker']['properties']['port']
+ else:
+ kafka_broker_port = '6667'
+
+ if 'kafka-broker' in services['configurations'] and 'listeners' in services['configurations']['kafka-broker']['properties']:
+ kafka_server_listeners = services['configurations']['kafka-broker']['properties']['listeners']
+ else:
+ kafka_server_listeners = 'PLAINTEXT://localhost:6667'
+
+ security_enabled = self.isSecurityEnabled(services)
+
+ if ',' in kafka_server_listeners and len(kafka_server_listeners.split(',')) > 1:
+ for listener in kafka_server_listeners.split(','):
+ listener = listener.strip().split(':')
+ if len(listener) == 3:
+ if 'SASL' in listener[0] and security_enabled:
+ kafka_broker_port = listener[2]
+ break
+ elif 'SASL' not in listener[0] and not security_enabled:
+ kafka_broker_port = listener[2]
+ else:
+ listener = kafka_server_listeners.strip().split(':')
+ if len(listener) == 3:
+ kafka_broker_port = listener[2]
+
+ kafka_host_arr = []
+ for i in range(len(kafka_hosts)):
+ kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port)
+
+ kafka_bootstrap_servers = ",".join(kafka_host_arr)
+
+ if 'zookeeper.connect' in services['configurations']['kafka-broker']['properties']:
+ kafka_zookeeper_connect = services['configurations']['kafka-broker']['properties']['zookeeper.connect']
+ else:
+ kafka_zookeeper_connect = None
+
+ putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', kafka_bootstrap_servers)
+ putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', kafka_zookeeper_connect)
+ else:
+ putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', "")
+ putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', "")
+
+ if "HBASE" in servicesList and 'hbase-site' in services['configurations']:
+ if 'hbase.zookeeper.quorum' in services['configurations']['hbase-site']['properties']:
+ hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
+ else:
+ hbase_zookeeper_quorum = ""
+
+ putAtlasApplicationProperty('atlas.graph.storage.hostname', hbase_zookeeper_quorum)
+ putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', hbase_zookeeper_quorum)
+ else:
+ putAtlasApplicationProperty('atlas.graph.storage.hostname', "")
+ putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', "")
+
+ if "ranger-env" in services["configurations"] and "ranger-atlas-plugin-properties" in services["configurations"] and \
+ "ranger-atlas-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
+ ranger_atlas_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-atlas-plugin-enabled"]
+ putAtlasRangerPluginProperty('ranger-atlas-plugin-enabled', ranger_atlas_plugin_enabled)
+
+ ranger_atlas_plugin_enabled = ''
+ if 'ranger-atlas-plugin-properties' in configurations and 'ranger-atlas-plugin-enabled' in configurations['ranger-atlas-plugin-properties']['properties']:
+ ranger_atlas_plugin_enabled = configurations['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
+ elif 'ranger-atlas-plugin-properties' in services['configurations'] and 'ranger-atlas-plugin-enabled' in services['configurations']['ranger-atlas-plugin-properties']['properties']:
+ ranger_atlas_plugin_enabled = services['configurations']['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
+
+ if ranger_atlas_plugin_enabled and (ranger_atlas_plugin_enabled.lower() == 'Yes'.lower()):
+ putAtlasApplicationProperty('atlas.authorizer.impl','ranger')
+ else:
+ putAtlasApplicationProperty('atlas.authorizer.impl','simple')
+
+ #atlas server memory settings
+ if 'atlas-env' in services['configurations']:
+ atlas_server_metadata_size = 50000
+ if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']:
+ atlas_server_metadata_size = float(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size'])
+
+ atlas_server_xmx = 2048
+
+ if 300000 <= atlas_server_metadata_size < 500000:
+ atlas_server_xmx = 1024*5
+ if 500000 <= atlas_server_metadata_size < 1000000:
+ atlas_server_xmx = 1024*10
+ if atlas_server_metadata_size >= 1000000:
+ atlas_server_xmx = 1024*16
+
+ atlas_server_max_new_size = (atlas_server_xmx / 100) * 30
+
+ putAtlasEnvProperty("atlas_server_xmx", atlas_server_xmx)
+ putAtlasEnvProperty("atlas_server_max_new_size", atlas_server_max_new_size)
+
+
+ def recommendAtlasConfigurationsFromHDP26(self, configurations, clusterData, services, hosts):
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+ putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
+
+ knox_host = 'localhost'
+ knox_port = '8443'
+ if 'KNOX' in servicesList:
+ knox_hosts = self.getComponentHostNames(services, "KNOX", "KNOX_GATEWAY")
+ if len(knox_hosts) > 0:
+ knox_hosts.sort()
+ knox_host = knox_hosts[0]
+ if 'gateway-site' in services['configurations'] and 'gateway.port' in services['configurations']["gateway-site"]["properties"]:
+ knox_port = services['configurations']["gateway-site"]["properties"]['gateway.port']
+ putAtlasApplicationProperty('atlas.sso.knox.providerurl', 'https://{0}:{1}/gateway/knoxsso/api/v1/websso'.format(knox_host, knox_port))
+
+
+
+
+class AtlasValidator(service_advisor.ServiceAdvisor):
+ """
+ Atlas Validator checks the correctness of properties whenever the service is first added or the user attempts to
+ change configs via the UI.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.as_super = super(AtlasValidator, self)
+ self.as_super.__init__(*args, **kwargs)
+
+ self.validators = [("application-properties", self.validateAtlasConfigurationsFromHDP25)]
+
+
+
+ def validateAtlasConfigurationsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts):
+ application_properties = self.getSiteProperties(configurations, "application-properties")
+ validationItems = []
+
+ auth_type = application_properties['atlas.authentication.method.ldap.type']
+ auth_ldap_enable = application_properties['atlas.authentication.method.ldap'].lower() == 'true'
+ self.logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type))
+
+ # Required props
+ ldap_props = {"atlas.authentication.method.ldap.url": "",
+ "atlas.authentication.method.ldap.userDNpattern": "uid=",
+ "atlas.authentication.method.ldap.groupSearchBase": "",
+ "atlas.authentication.method.ldap.groupSearchFilter": "",
+ "atlas.authentication.method.ldap.groupRoleAttribute": "cn",
+ "atlas.authentication.method.ldap.base.dn": "",
+ "atlas.authentication.method.ldap.bind.dn": "",
+ "atlas.authentication.method.ldap.bind.password": "",
+ "atlas.authentication.method.ldap.user.searchfilter": ""
+ }
+ ad_props = {"atlas.authentication.method.ldap.ad.domain": "",
+ "atlas.authentication.method.ldap.ad.url": "",
+ "atlas.authentication.method.ldap.ad.base.dn": "",
+ "atlas.authentication.method.ldap.ad.bind.dn": "",
+ "atlas.authentication.method.ldap.ad.bind.password": "",
+ "atlas.authentication.method.ldap.ad.user.searchfilter": "(sAMAccountName={0})"
+ }
+
+ props_to_require = set()
+ if auth_type.lower() == "ldap":
+ props_to_require = set(ldap_props.keys())
+ elif auth_type.lower() == "ad":
+ props_to_require = set(ad_props.keys())
+ elif auth_type.lower() == "none":
+ pass
+
+ if auth_ldap_enable:
+ for prop in props_to_require:
+ if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "":
+ validationItems.append({"config-name": prop,
+ "item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)})
+
+ if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \
+ not application_properties['atlas.graph.index.search.solr.zookeeper-url']:
+ validationItems.append({"config-name": "atlas.graph.index.search.solr.zookeeper-url",
+ "item": self.getErrorItem(
+ "If AMBARI_INFRA is not installed then the SOLR zookeeper url configuration must be specified.")})
+
+ if not application_properties['atlas.kafka.bootstrap.servers']:
+ validationItems.append({"config-name": "atlas.kafka.bootstrap.servers",
+ "item": self.getErrorItem(
+ "If KAFKA is not installed then the Kafka bootstrap servers configuration must be specified.")})
+
+ if not application_properties['atlas.kafka.zookeeper.connect']:
+ validationItems.append({"config-name": "atlas.kafka.zookeeper.connect",
+ "item": self.getErrorItem(
+ "If KAFKA is not installed then the Kafka zookeeper quorum configuration must be specified.")})
+
+ if application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' in services['configurations']:
+ hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
+
+ if not application_properties['atlas.graph.storage.hostname']:
+ validationItems.append({"config-name": "atlas.graph.storage.hostname",
+ "item": self.getErrorItem(
+ "If HBASE is not installed then the hbase zookeeper quorum configuration must be specified.")})
+ elif string_set_equals(application_properties['atlas.graph.storage.hostname'], hbase_zookeeper_quorum):
+ validationItems.append({"config-name": "atlas.graph.storage.hostname",
+ "item": self.getWarnItem(
+ "Atlas is configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
+
+ if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
+ validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
+ "item": self.getErrorItem(
+ "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
+
+ elif application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' not in services[
+ 'configurations']:
+ if not application_properties['atlas.graph.storage.hostname']:
+ validationItems.append({"config-name": "atlas.graph.storage.hostname",
+ "item": self.getErrorItem(
+ "Atlas is not configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
+ if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
+ validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
+ "item": self.getErrorItem(
+ "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
+
+ validationProblems = self.toConfigurationValidationProblems(validationItems, "application-properties")
+ return validationProblems
+