You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/08/07 18:43:16 UTC

ambari git commit: AMBARI-12670 Upgrade Catalog: 2.1 -> 2.3 missing config hiveserver2-site (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk c89d427c7 -> a566bb143


AMBARI-12670 Upgrade Catalog: 2.1 -> 2.3 missing config hiveserver2-site (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a566bb14
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a566bb14
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a566bb14

Branch: refs/heads/trunk
Commit: a566bb143a81954005f0bd1133d559bc208dc960
Parents: c89d427
Author: Dmytro Sen <ds...@apache.org>
Authored: Fri Aug 7 19:42:56 2015 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Fri Aug 7 19:42:56 2015 +0300

----------------------------------------------------------------------
 ambari-server/src/main/python/upgradeHelper.py  | 261 ++++++++++++++++++-
 .../catalog/UpgradeCatalog_2.1_to_2.3.json      |   3 +-
 2 files changed, 254 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a566bb14/ambari-server/src/main/python/upgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/upgradeHelper.py b/ambari-server/src/main/python/upgradeHelper.py
index 14cef85..7da5e5f 100644
--- a/ambari-server/src/main/python/upgradeHelper.py
+++ b/ambari-server/src/main/python/upgradeHelper.py
@@ -69,11 +69,12 @@ Example:
           },
           "test_property": {
            "value": "new value",
-           "override: "no", (optional, override already existed property yes/no)
+           "override: "no", (optional, override already existed property yes/no. Default: yes)
            "value-required": "old value",  (optional, property would be set if the required value is present)
            "can-create": "no", (optional, process property only if that property present on the server.
-                                         i.e. ability to create new property, default yes)
-           "required-services": ["HDFS", "YARN"]  (optional, process property only if selected services existed)
+                                         i.e. ability to create new property. Default: yes)
+           "required-services": ["HDFS", "YARN"],  (optional, process property only if selected services existed)
+           "resolve-dependency": "no" (optional, use Stack Advisor to get depended properties changes. Default: no)
           }
         }
       },
@@ -92,7 +93,7 @@ Example:
                                             )
           "replace-from": "something", (optional, should be present both from and to. Replace 'from' value to 'to')
           "replace-to": "something,
-          "required-services": ["YARN"]  (optional, process entry if services in the list existed on the cluster
+          "required-services": ["YARN"],  (optional, process entry if services in the list existed on the cluster
       }
      }
     }
@@ -172,6 +173,10 @@ class PropertyNotFoundException(Exception):
   pass
 
 
+class StackNotFoundException(Exception):
+  pass
+
+
 class MalformedPropertyDefinitionException(Exception):
   pass
 
@@ -218,6 +223,8 @@ class Options(Const):
   logger = None
   server_config_factory = None
   """:type : ServerConfigFactory"""
+  stack_advisor = None
+  """:type : StackAdvisor"""
 
   # Api constants
   ROOT_URL = None
@@ -253,9 +260,16 @@ class Options(Const):
     cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME
     cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}"
     cls.TEZ_VIEW_URL = cls.ROOT_URL + "/views/TEZ"
+    cls.STACKS_URL = cls.ROOT_URL + "/stacks"
+    cls.STACKS_VERSIONS_URL = cls.STACKS_URL + "/{0}/versions"
+    cls.STACK_ADVISOR_URL = cls.STACKS_VERSIONS_URL + "/{1}/recommendations"
+    cls.AMBARI_SERVER_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_SERVER"
+    cls.AMBARI_AGENTS_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_AGENT"
     if cls.CLUSTER_NAME is not None and cls.HOST is not None:
       cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services()))
 
+    cls.ambari_server = AmbariServer()
+
   @classmethod
   def initialize_logger(cls, filename=None):
     cls.logger = logging.getLogger('UpgradeHelper')
@@ -284,6 +298,7 @@ class CatConst(Const):
   STACK_PROPERTIES = "properties"
   STACK_PROPERTIES_ATTRIBUTES = "properties_attributes"
   PROPERTY_VALUE_TAG = "value"
+  VERSIONS_TAG = "versions"
   PROPERTY_REMOVE_TAG = "remove"
   PROPERTY_MAP_TO = "map-to"
   PROPERTY_MAP_FROM = "map-from"
@@ -293,6 +308,7 @@ class CatConst(Const):
   MERGED_COPY_TAG = "merged-copy"
   REQUIRED_SERVICES = "required-services"
   COERCE_TO_PROPERTY_TAG = "coerce-to"
+  RESOLVE_DEPENDENCY_TAG = "resolve-dependency"
   COERCE_YAML_OPTION_TAG = "yaml-array"
   REPLACE_FROM_TAG = "replace-from"
   REPLACE_TO_TAG = "replace-to"
@@ -314,6 +330,179 @@ class CatConst(Const):
 # ==============================
 #    Catalog classes definition
 # ==============================
+
+class AmbariServer(object):
+  def __init__(self):
+    Options.logger.info("Resolving Ambari server configuration ...")
+    self._get_server_info()
+    self._get_agents_info()
+
+  def _get_server_info(self):
+    info = curl(Options.AMBARI_SERVER_URL, parse=True)
+    self._server_version = [0, 0, 0]
+
+    if "RootServiceComponents" in info:
+      server_props = info["RootServiceComponents"]
+      ver = server_props["component_version"] if "component_version" in server_props else None
+      try:
+        self._server_version = list(map(lambda x: int(x), ver.split(".")))
+      except ValueError:
+        pass
+
+  def _get_agents_info(self):
+    info = curl(Options.AMBARI_AGENTS_URL, parse=True)
+    self._agents = []
+    if "hostComponents" in info:
+      agent_props = info["hostComponents"]
+      self._agents = list(map(lambda x: x["RootServiceHostComponents"]["host_name"], agent_props))
+
+  @property
+  def server_version(self):
+    return self._server_version
+
+  @property
+  def agent_hosts(self):
+    return self._agents
+
+class StackAdvisorFactory(object):
+  def __init__(self):
+    self._stack_info = self._load_stack_info()
+
+  def _load_stack_versions(self, stack):
+    versions = curl(Options.STACKS_VERSIONS_URL.format(stack), parse=True)
+    if CatConst.ITEMS_TAG in versions:
+      versions = list(map(lambda x: x["Versions"]["stack_version"], versions[CatConst.ITEMS_TAG]))
+
+    return versions
+
+  def _load_stack_info(self):
+    stacks = curl(Options.STACKS_URL, parse=True)
+    if CatConst.ITEMS_TAG in stacks:
+      stacks = list(map(lambda x: x["Stacks"]["stack_name"], stacks["items"]))
+    else:
+      stacks = {}
+
+    stacks_dict = {}
+
+    for stack in stacks:
+      stacks_dict[stack] = self._load_stack_versions(stack)
+
+    return stacks_dict
+
+  def get_instance(self, stack, version):
+    sversion = Options.ambari_server.server_version
+    if sversion[0] * 10 + sversion[1] < 21:
+      Options.logger.warning("Ambari server version \"%s.%s.%s\" doesn't support property dependencies suggestion" %
+                             (sversion[0], sversion[1], sversion[2]))
+      return BaseStackAdvisor(stack, version)
+
+    if stack in self._stack_info and version in self._stack_info[stack]:
+      return StackAdvisor(stack, version)
+    else:
+      raise StackNotFoundException("Stack %s-%s not exist on the server" % (stack, version))
+
+class StackAdvisorRequestProperty(object):
+  def __init__(self, catalog, property_name):
+    self._catalog = catalog
+    self._property_name = property_name
+
+  @property
+  def catalog(self):
+    return self._catalog
+
+  @property
+  def name(self):
+    return self._property_name
+
+  def get_json(self):
+    return {
+      "type": self.catalog,
+      "name": self.name
+    }
+
+
+class BaseStackAdvisor(object):
+  def __init__(self, stack, version):
+    self._req_url = Options.STACK_ADVISOR_URL.format(stack, version)
+
+  def get_suggestion(self, cfg_factory, changed_properties):
+    return {}
+
+
+class StackAdvisor(BaseStackAdvisor):
+  def __init__(self, stack, version):
+    super(StackAdvisor, self).__init__(stack, version)
+
+  def _transform_properties(self, cfg_factory):
+    """
+    Transform properties list to blueprint output format
+    :type cfg_factory: ServerConfigFactory
+    :rtype dict
+    """
+    props = cfg_factory.get_json()
+    for cfg in props:
+      props[cfg] = {
+        "properties": props[cfg]
+      }
+
+    return props
+
+  def _from_blueprint_properties_transform(self, props):
+    """
+    Transform SA response to dict
+    """
+    for p in props:
+      rprop = {}
+      if "properties" in props[p] and props[p]["properties"] is not None:
+        rprop = props[p]["properties"]
+      if "property_attributes" in props[p]:
+        for property_attribute in props[p]["property_attributes"]:
+          if "delete" in props[p]["property_attributes"][property_attribute] and \
+            props[p]["property_attributes"][property_attribute]["delete"] == "true":
+            rprop[property_attribute] = None
+
+      props[p] = rprop
+
+    return props
+
+  def _generate_req_properties(self, properties):
+    rlist = []
+    for item in properties:
+      if isinstance(item, StackAdvisorRequestProperty):
+        rlist.append(item.get_json())
+    return rlist
+
+  def get_suggestion(self, cfg_factory, changed_properties):
+    """
+    :type cfg_factory: ServerConfigFactory
+    :type catalog_name str
+    :type changed_properties: list
+    :rtype dict
+    """
+    request = {
+      "recommend": "configuration-dependencies",
+      "hosts": Options.ambari_server.agent_hosts,
+      "services": list(Options.SERVICES),
+      "changed_configurations": self._generate_req_properties(changed_properties),
+      "recommendations": {
+        "blueprint": {
+          "host_groups": [],
+          "configurations": self._transform_properties(cfg_factory),
+          "blueprint_cluster_binding": {}
+        }
+      }
+    }
+    response = curl(self._req_url, request_type="POST", data=request, parse=True)
+    if "resources" in response and isinstance(response["resources"], list) and len(response["resources"]) > 0:
+      response = response["resources"][0]
+      if "recommendations" in response and "blueprint" in response["recommendations"] and \
+        "configurations" in response["recommendations"]["blueprint"]:
+        return self._from_blueprint_properties_transform(response["recommendations"]["blueprint"]["configurations"])
+
+    return {}
+
+
+
 class UpgradeCatalogFactory(object):
   # versions of catalog which is currently supported
   _supported_catalog_versions = ["1.0"]
@@ -433,6 +622,14 @@ class UpgradeCatalog(object):
   def version(self):
     return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET])
 
+  @property
+  def target_version(self):
+    return self._version[CatConst.STACK_VERSION_TARGET]
+
+  @property
+  def source_version(self):
+    return self._version[CatConst.STACK_VERSION_OLD]
+
   def get_parsed_version(self):
     """
      Get numeric representation of the version for comparation purposes
@@ -486,8 +683,9 @@ class UpgradeCatalog(object):
   def tag_search_pattern(self):
     return self._search_pattern
 
-  def __handle_remove_tag(self, catalog_item_name, catalog_property_item, properties):
+  def __handle_remove_tag(self, name, catalog_item_name, catalog_property_item, properties):
     """
+    :type name str
     :type catalog_item_name str
     :type catalog_property_item dict
     :type properties dict
@@ -515,8 +713,9 @@ class UpgradeCatalog(object):
       except TemplateProcessingException:
         pass
 
-  def __handle_add_new(self, catalog_item_name, catalog_property_item, properties):
+  def __handle_add_new(self, name, catalog_item_name, catalog_property_item, properties):
     """
+    :type name str
     :type catalog_item_name str
     :type catalog_property_item dict
     :type properties dict
@@ -528,8 +727,9 @@ class UpgradeCatalog(object):
       self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
       properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
 
-  def __handle_change_existing(self, catalog_item_name, catalog_property_item, properties):
+  def __handle_change_existing(self, name, catalog_item_name, catalog_property_item, properties):
     """
+    :type name str
     :type catalog_item_name str
     :type catalog_property_item dict
     :type properties dict
@@ -545,6 +745,31 @@ class UpgradeCatalog(object):
       properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
     return properties
 
+  def __handle_dependency_tag(self, name, catalog_item_name, catalog_property_item, properties):
+    """
+    :type name str
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    if CatConst.RESOLVE_DEPENDENCY_TAG in catalog_property_item and \
+        catalog_property_item[CatConst.RESOLVE_DEPENDENCY_TAG] == CatConst.TRUE_TAG:
+      sa_suggestions = Options.stack_advisor.get_suggestion(Options.server_config_factory,
+                                                            [StackAdvisorRequestProperty(name, catalog_item_name)])
+      for sa_catalog in sa_suggestions:
+        # create new config group if not existed
+        if sa_catalog not in Options.server_config_factory.items():
+          Options.server_config_factory.create_config(sa_catalog)
+
+        catalog_properties = Options.server_config_factory.get_config(sa_catalog).properties
+        for sa_property in sa_suggestions[sa_catalog]:
+          if sa_suggestions[sa_catalog][sa_property] is None and sa_property in catalog_properties:
+            print "rem %s:%s" % (sa_catalog, sa_property)
+            del catalog_properties[sa_property]
+          elif sa_suggestions[sa_catalog][sa_property] is not None:
+            catalog_properties[sa_property] = sa_suggestions[sa_catalog][sa_property]
+
+
   def __can_handler_execute(self, catalog_options, catalog_property_item, property_item, properties):
     """
     :type catalog_options dict
@@ -582,6 +807,7 @@ class UpgradeCatalog(object):
     tag_handlers = [
       self.__handle_add_new,
       self.__handle_change_existing,
+      self.__handle_dependency_tag,
       self.__handle_remove_tag
     ]
     # catalog has no update entries for this config group
@@ -593,7 +819,7 @@ class UpgradeCatalog(object):
       catalog_options = self.options[name] if name in self.options else {}
       if self.__can_handler_execute(catalog_options, catalog_property_item, catalog_item[catalog_property_item], properties):
         for handler in tag_handlers:
-          handler(catalog_property_item, catalog_item[catalog_property_item], properties)
+          handler(name, catalog_property_item, catalog_item[catalog_property_item], properties)
 
 
 class PropertyMapping(object):
@@ -661,6 +887,19 @@ class ServerConfigFactory(object):
       if config_item is not None and name == _name and name in self._server_catalogs:
         config_item.notify(action, arg)
 
+  def __str__(self):
+    catalogs = {}
+    for cfg in self._server_catalogs:
+      catalogs[cfg] = str(self._server_catalogs[cfg])
+
+    return json.dumps(catalogs)
+
+  def get_json(self):
+    catalogs = {}
+    for cfg in self._server_catalogs:
+      catalogs[cfg] = self._server_catalogs[cfg].properties
+
+    return catalogs
   def get_config(self, name):
     """
     Get configuration item object
@@ -831,6 +1070,9 @@ class ServerConfig(object):
   def is_attributes_exists(self):
     return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs
 
+  def __str__(self):
+    return json.dumps(self.properties)
+
   @property
   def properties(self):
     return self._configs[CatConst.STACK_PROPERTIES]
@@ -1497,6 +1739,7 @@ def modify_configs():
   catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json)  # Load upgrade catalog
   catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
                                      Options.OPTIONS.to_stack)  # get desired version of catalog
+  Options.stack_advisor = StackAdvisorFactory().get_instance(catalog.name, catalog.target_version)
 
   # load all desired configs from the server
   # ToDo: implement singleton for that class
@@ -1850,7 +2093,7 @@ def main():
                    Options.INSTALL_YARN_MR2_ACTION: install_services,
                    Options.BACKUP_CONFIG_ACTION: backup_configs,
                    Options.VERIFY_ACTION: verify_configuration
-                   }
+                 }
 
   parser = optparse.OptionParser(usage="usage: %prog [options] action\n  Valid actions: "
                                        + ", ".join(action_list.keys())

http://git-wip-us.apache.org/repos/asf/ambari/blob/a566bb14/ambari-server/src/main/resources/upgrade/catalog/UpgradeCatalog_2.1_to_2.3.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/upgrade/catalog/UpgradeCatalog_2.1_to_2.3.json b/ambari-server/src/main/resources/upgrade/catalog/UpgradeCatalog_2.1_to_2.3.json
index 6b93ecd..18ea6cd 100644
--- a/ambari-server/src/main/resources/upgrade/catalog/UpgradeCatalog_2.1_to_2.3.json
+++ b/ambari-server/src/main/resources/upgrade/catalog/UpgradeCatalog_2.1_to_2.3.json
@@ -55,7 +55,8 @@
           "hive_security_authorization": {
             "value": "{HIVE_SECURITY_AUTHORIZATION}",
             "template": "yes",
-            "override": "no"
+            "override": "no",
+            "resolve-dependency": "yes"
           }
         },
         "falcon-startup.properties": {