You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/05/10 13:37:31 UTC

[08/22] ambari git commit: AMBARI-20960. HDP 3.0 TP - create Service Advisor for Atlas.(vbrodetskyi)

AMBARI-20960. HDP 3.0 TP - create Service Advisor for Atlas.(vbrodetskyi)


Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 6bae2b607b5ed95beba55975d2ff5b23b7a3f090
Parents: 42a542a
Author: Vitaly Brodetskyi <>
Authored: Mon May 8 23:47:24 2017 +0300
Committer: Vitaly Brodetskyi <>
Committed: Mon May 8 23:47:24 2017 +0300

 .../ATLAS/          | 441 +++++++++++++++++++
 1 file changed, 441 insertions(+)
diff --git a/ambari-server/src/main/resources/common-services/ATLAS/ b/ambari-server/src/main/resources/common-services/ATLAS/
new file mode 100644
index 0000000..a2e31cc
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/ATLAS/
@@ -0,0 +1,441 @@
+#!/usr/bin/env ambari-python-wrap
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+# Python imports
+import imp
+import os
+import traceback
+import re
+import socket
+import fnmatch
+from ambari_commons.str_utils import string_set_equals
+from resource_management.core.logger import Logger
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
+PARENT_FILE = os.path.join(STACKS_DIR, '')
+  with open(PARENT_FILE, 'rb') as fp:
+    service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
+except Exception as e:
+  traceback.print_exc()
+  print "Failed to load parent"
+class AtlasServiceAdvisor(service_advisor.ServiceAdvisor):
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(AtlasServiceAdvisor, self)
+    self.as_super.__init__(*args, **kwargs)
+    # Always call these methods
+    self.modifyMastersWithMultipleInstances()
+    self.modifyCardinalitiesDict()
+    self.modifyHeapSizeProperties()
+    self.modifyNotValuableComponents()
+    self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
+  def modifyMastersWithMultipleInstances(self):
+    """
+    Modify the set of masters with multiple instances.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+  def modifyCardinalitiesDict(self):
+    """
+    Modify the dictionary of cardinalities.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+  def modifyHeapSizeProperties(self):
+    """
+    Modify the dictionary of heap size properties.
+    Must be overriden in child class.
+    """
+    pass
+  def modifyNotValuableComponents(self):
+    """
+    Modify the set of components whose host assignment is based on other services.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+  def modifyComponentsNotPreferableOnServer(self):
+    """
+    Modify the set of components that are not preferable on the server.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+    Must be overriden in child class.
+    """
+    # Nothing to do
+    pass
+  def getServiceComponentLayoutValidations(self, services, hosts):
+    """
+    Get a list of errors.
+    Must be overriden in child class.
+    """
+    return []
+  def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts):
+    """
+    Entry point.
+    Must be overriden in child class.
+    """
+"Class: %s, Method: %s. Recommending Service Configurations." %
+    #            (self.__class__.__name__, inspect.stack()[0][3]))
+    recommender = AtlasRecommender()
+    recommender.recommendAtlasConfigurationsFromHDP25(configurations, clusterData, services, hosts)
+    recommender.recommendAtlasConfigurationsFromHDP26(configurations, clusterData, services, hosts)
+  def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
+    """
+    Entry point.
+    Validate configurations for the service. Return a list of errors.
+    The code for this function should be the same for each Service Advisor.
+    """
+"Class: %s, Method: %s. Validating Configurations." %
+    #            (self.__class__.__name__, inspect.stack()[0][3]))
+    validator = AtlasValidator()
+    # Calls the methods of the validator using arguments,
+    # method(siteProperties, siteRecommendations, configurations, services, hosts)
+    return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators)
+class AtlasRecommender(service_advisor.ServiceAdvisor):
+  """
+  Atlas Recommender suggests properties when adding the service for the first time or modifying configs via the UI.
+  """
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(AtlasRecommender, self)
+    self.as_super.__init__(*args, **kwargs)
+  def constructAtlasRestAddress(self, services, hosts):
+    """
+    :param services: Collection of services in the cluster with configs
+    :param hosts: Collection of hosts in the cluster
+    :return: The suggested property for if it is valid, otherwise, None
+    """
+    atlas_rest_address = None
+    services_list = [service["StackServices"]["service_name"] for service in services["services"]]
+    is_atlas_in_cluster = "ATLAS" in services_list
+    atlas_server_hosts_info = self.getHostsWithComponent("ATLAS", "ATLAS_SERVER", services, hosts)
+    if is_atlas_in_cluster and atlas_server_hosts_info and len(atlas_server_hosts_info) > 0:
+      # Multiple Atlas Servers can exist, so sort by hostname to create deterministic csv
+      atlas_host_names = [e['Hosts']['host_name'] for e in atlas_server_hosts_info]
+      if len(atlas_host_names) > 1:
+        atlas_host_names = sorted(atlas_host_names)
+      scheme = "http"
+      metadata_port = "21000"
+      atlas_server_default_https_port = "21443"
+      tls_enabled = "false"
+      if 'application-properties' in services['configurations']:
+        if 'atlas.enableTLS' in services['configurations']['application-properties']['properties']:
+          tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS']
+        if 'atlas.server.http.port' in services['configurations']['application-properties']['properties']:
+          metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.http.port'])
+        if str(tls_enabled).lower() == "true":
+          scheme = "https"
+          if 'atlas.server.https.port' in services['configurations']['application-properties']['properties']:
+            metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.https.port'])
+          else:
+            metadata_port = atlas_server_default_https_port
+      atlas_rest_address_list = ["{0}://{1}:{2}".format(scheme, hostname, metadata_port) for hostname in atlas_host_names]
+      atlas_rest_address = ",".join(atlas_rest_address_list)
+"Constructing" % atlas_rest_address)
+    return atlas_rest_address
+  def recommendAtlasConfigurationsFromHDP25(self, configurations, clusterData, services, hosts):
+    putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
+    putAtlasRangerPluginProperty = self.putProperty(configurations, "ranger-atlas-plugin-properties", services)
+    putAtlasEnvProperty = self.putProperty(configurations, "atlas-env", services)
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    # Generate since the value is always computed
+    atlas_rest_address = self.constructAtlasRestAddress(services, hosts)
+    if atlas_rest_address is not None:
+      putAtlasApplicationProperty("", atlas_rest_address)
+    if "AMBARI_INFRA" in servicesList and 'infra-solr-env' in services['configurations']:
+      if 'infra_solr_znode' in services['configurations']['infra-solr-env']['properties']:
+        infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode']
+      else:
+        infra_solr_znode = None
+      zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services)
+      zookeeper_host_arr = []
+      zookeeper_port = self.getZKPort(services)
+      for i in range(len(zookeeper_hosts)):
+        zookeeper_host = zookeeper_hosts[i] + ':' + zookeeper_port
+        if infra_solr_znode is not None:
+          zookeeper_host += infra_solr_znode
+        zookeeper_host_arr.append(zookeeper_host)
+      solr_zookeeper_url = ",".join(zookeeper_host_arr)
+      putAtlasApplicationProperty('', solr_zookeeper_url)
+    else:
+      putAtlasApplicationProperty('', "")
+    # Kafka section
+    if "KAFKA" in servicesList and 'kafka-broker' in services['configurations']:
+      kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services)
+      if 'port' in services['configurations']['kafka-broker']['properties']:
+        kafka_broker_port = services['configurations']['kafka-broker']['properties']['port']
+      else:
+        kafka_broker_port = '6667'
+      if 'kafka-broker' in services['configurations'] and 'listeners' in services['configurations']['kafka-broker']['properties']:
+        kafka_server_listeners = services['configurations']['kafka-broker']['properties']['listeners']
+      else:
+        kafka_server_listeners = 'PLAINTEXT://localhost:6667'
+      security_enabled = self.isSecurityEnabled(services)
+      if ',' in kafka_server_listeners and len(kafka_server_listeners.split(',')) > 1:
+        for listener in kafka_server_listeners.split(','):
+          listener = listener.strip().split(':')
+          if len(listener) == 3:
+            if 'SASL' in listener[0] and security_enabled:
+              kafka_broker_port = listener[2]
+              break
+            elif  'SASL' not in listener[0] and not security_enabled:
+              kafka_broker_port = listener[2]
+      else:
+        listener = kafka_server_listeners.strip().split(':')
+        if len(listener) == 3:
+          kafka_broker_port  = listener[2]
+      kafka_host_arr = []
+      for i in range(len(kafka_hosts)):
+        kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port)
+      kafka_bootstrap_servers = ",".join(kafka_host_arr)
+      if 'zookeeper.connect' in services['configurations']['kafka-broker']['properties']:
+        kafka_zookeeper_connect = services['configurations']['kafka-broker']['properties']['zookeeper.connect']
+      else:
+        kafka_zookeeper_connect = None
+      putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', kafka_bootstrap_servers)
+      putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', kafka_zookeeper_connect)
+    else:
+      putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', "")
+      putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', "")
+    if "HBASE" in servicesList and 'hbase-site' in services['configurations']:
+      if 'hbase.zookeeper.quorum' in services['configurations']['hbase-site']['properties']:
+        hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
+      else:
+        hbase_zookeeper_quorum = ""
+      putAtlasApplicationProperty('', hbase_zookeeper_quorum)
+      putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', hbase_zookeeper_quorum)
+    else:
+      putAtlasApplicationProperty('', "")
+      putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', "")
+    if "ranger-env" in services["configurations"] and "ranger-atlas-plugin-properties" in services["configurations"] and \
+        "ranger-atlas-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
+      ranger_atlas_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-atlas-plugin-enabled"]
+      putAtlasRangerPluginProperty('ranger-atlas-plugin-enabled', ranger_atlas_plugin_enabled)
+    ranger_atlas_plugin_enabled = ''
+    if 'ranger-atlas-plugin-properties' in configurations and 'ranger-atlas-plugin-enabled' in configurations['ranger-atlas-plugin-properties']['properties']:
+      ranger_atlas_plugin_enabled = configurations['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
+    elif 'ranger-atlas-plugin-properties' in services['configurations'] and 'ranger-atlas-plugin-enabled' in services['configurations']['ranger-atlas-plugin-properties']['properties']:
+      ranger_atlas_plugin_enabled = services['configurations']['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled']
+    if ranger_atlas_plugin_enabled and (ranger_atlas_plugin_enabled.lower() == 'Yes'.lower()):
+      putAtlasApplicationProperty('atlas.authorizer.impl','ranger')
+    else:
+      putAtlasApplicationProperty('atlas.authorizer.impl','simple')
+    #atlas server memory settings
+    if 'atlas-env' in services['configurations']:
+      atlas_server_metadata_size = 50000
+      if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']:
+        atlas_server_metadata_size = float(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size'])
+      atlas_server_xmx = 2048
+      if 300000 <= atlas_server_metadata_size < 500000:
+        atlas_server_xmx = 1024*5
+      if 500000 <= atlas_server_metadata_size < 1000000:
+        atlas_server_xmx = 1024*10
+      if atlas_server_metadata_size >= 1000000:
+        atlas_server_xmx = 1024*16
+      atlas_server_max_new_size = (atlas_server_xmx / 100) * 30
+      putAtlasEnvProperty("atlas_server_xmx", atlas_server_xmx)
+      putAtlasEnvProperty("atlas_server_max_new_size", atlas_server_max_new_size)
+  def recommendAtlasConfigurationsFromHDP26(self, configurations, clusterData, services, hosts):
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services)
+    knox_host = 'localhost'
+    knox_port = '8443'
+    if 'KNOX' in servicesList:
+      knox_hosts = self.getComponentHostNames(services, "KNOX", "KNOX_GATEWAY")
+      if len(knox_hosts) > 0:
+        knox_hosts.sort()
+        knox_host = knox_hosts[0]
+      if 'gateway-site' in services['configurations'] and 'gateway.port' in services['configurations']["gateway-site"]["properties"]:
+        knox_port = services['configurations']["gateway-site"]["properties"]['gateway.port']
+      putAtlasApplicationProperty('atlas.sso.knox.providerurl', 'https://{0}:{1}/gateway/knoxsso/api/v1/websso'.format(knox_host, knox_port))
+class AtlasValidator(service_advisor.ServiceAdvisor):
+  """
+  Atlas Validator checks the correctness of properties whenever the service is first added or the user attempts to
+  change configs via the UI.
+  """
+  def __init__(self, *args, **kwargs):
+    self.as_super = super(AtlasValidator, self)
+    self.as_super.__init__(*args, **kwargs)
+    self.validators = [("application-properties", self.validateAtlasConfigurationsFromHDP25)]
+  def validateAtlasConfigurationsFromHDP25(self, properties, recommendedDefaults, configurations, services, hosts):
+    application_properties = self.getSiteProperties(configurations, "application-properties")
+    validationItems = []
+    auth_type = application_properties['atlas.authentication.method.ldap.type']
+    auth_ldap_enable = application_properties['atlas.authentication.method.ldap'].lower() == 'true'
+"Validating Atlas configs, authentication type: %s" % str(auth_type))
+    # Required props
+    ldap_props = {"atlas.authentication.method.ldap.url": "",
+                  "atlas.authentication.method.ldap.userDNpattern": "uid=",
+                  "atlas.authentication.method.ldap.groupSearchBase": "",
+                  "atlas.authentication.method.ldap.groupSearchFilter": "",
+                  "atlas.authentication.method.ldap.groupRoleAttribute": "cn",
+                  "atlas.authentication.method.ldap.base.dn": "",
+                  "atlas.authentication.method.ldap.bind.dn": "",
+                  "atlas.authentication.method.ldap.bind.password": "",
+                  "atlas.authentication.method.ldap.user.searchfilter": ""
+    }
+    ad_props = {"": "",
+                "": "",
+                "": "",
+                "": "",
+                "": "",
+                "": "(sAMAccountName={0})"
+    }
+    props_to_require = set()
+    if auth_type.lower() == "ldap":
+      props_to_require = set(ldap_props.keys())
+    elif auth_type.lower() == "ad":
+      props_to_require = set(ad_props.keys())
+    elif auth_type.lower() == "none":
+      pass
+    if auth_ldap_enable:
+      for prop in props_to_require:
+        if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "":
+          validationItems.append({"config-name": prop,
+                                  "item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)})
+    if application_properties[''] == 'solr5' and \
+            not application_properties['']:
+      validationItems.append({"config-name": "",
+                              "item": self.getErrorItem(
+                                "If AMBARI_INFRA is not installed then the SOLR zookeeper url configuration must be specified.")})
+    if not application_properties['atlas.kafka.bootstrap.servers']:
+      validationItems.append({"config-name": "atlas.kafka.bootstrap.servers",
+                              "item": self.getErrorItem(
+                                "If KAFKA is not installed then the Kafka bootstrap servers configuration must be specified.")})
+    if not application_properties['atlas.kafka.zookeeper.connect']:
+      validationItems.append({"config-name": "atlas.kafka.zookeeper.connect",
+                              "item": self.getErrorItem(
+                                "If KAFKA is not installed then the Kafka zookeeper quorum configuration must be specified.")})
+    if application_properties[''] == 'hbase' and 'hbase-site' in services['configurations']:
+      hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum']
+      if not application_properties['']:
+        validationItems.append({"config-name": "",
+                                "item": self.getErrorItem(
+                                  "If HBASE is not installed then the hbase zookeeper quorum configuration must be specified.")})
+      elif string_set_equals(application_properties[''], hbase_zookeeper_quorum):
+        validationItems.append({"config-name": "",
+                                "item": self.getWarnItem(
+                                  "Atlas is configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
+      if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
+        validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
+                                "item": self.getErrorItem(
+                                  "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
+    elif application_properties[''] == 'hbase' and 'hbase-site' not in services[
+      'configurations']:
+      if not application_properties['']:
+        validationItems.append({"config-name": "",
+                                "item": self.getErrorItem(
+                                  "Atlas is not configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")})
+      if not application_properties['atlas.audit.hbase.zookeeper.quorum']:
+        validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum",
+                                "item": self.getErrorItem(
+                                  "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")})
+    validationProblems = self.toConfigurationValidationProblems(validationItems, "application-properties")
+    return validationProblems