You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2015/06/30 15:28:47 UTC
[2/2] ambari git commit: AMBARI-12216. upgradeHelper.py script should
handle property mapping across the property catalogs (dlysnichenko)
AMBARI-12216. upgradeHelper.py script should handle property mapping across the property catalogs (dlysnichenko)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1e158101
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1e158101
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1e158101
Branch: refs/heads/branch-2.1
Commit: 1e1581010c9bb2019ce3a6b66c74416f565c6a02
Parents: ac6e99e
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Tue Jun 30 16:27:41 2015 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Jun 30 16:28:23 2015 +0300
----------------------------------------------------------------------
ambari-server/src/main/python/upgradeHelper.py | 1038 ++++++++++--------
.../src/test/python/TestUpgradeHelper.py | 383 +------
2 files changed, 636 insertions(+), 785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1e158101/ambari-server/src/main/python/upgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/upgradeHelper.py b/ambari-server/src/main/python/upgradeHelper.py
index 4ab834a..196dd3c 100644
--- a/ambari-server/src/main/python/upgradeHelper.py
+++ b/ambari-server/src/main/python/upgradeHelper.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-
"""
Upgrade catalog file format description:
@@ -70,8 +69,15 @@ Example:
}
}
},
- "property-mapping": {
- "old-property-name": "new-property-name"
+ "property-mapping": {
+ "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" })
+ "old-property1-name": {
+ "map-to": "new_property1_name", (required, new property name)
+ "from-catalog": "test", (optional, require "to-catalog. Source of old-property1-name)
+ "to-catalog": "test", (optional, require "from-catalog. Target of new_property1_name)
+ "default": "default value", (optional, if set and old property not exists, new one would be created with default value)
+ "required-services": ["YARN"] (optional, process entry if services in the list existed on the cluster
+ }
}
}
]
@@ -85,13 +91,17 @@ import optparse
from pprint import pprint
import re
import sys
-import datetime
import os.path
import logging
-import shutil
-import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
-import subprocess
import time
+import base64
+from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError
+
+try:
+ # try to import new simplejson version, which should be faster than outdated python 2.6 version
+ import ambari_simplejson as json
+except ImportError:
+ import json
# ==============================
@@ -116,6 +126,7 @@ class ReadOnlyPropertyException(Exception):
def _get_message(self):
return self.__str__()
+
class NotSupportedCatalogVersion(Exception):
def __init__(self, catalog_version):
self._version = catalog_version
@@ -129,6 +140,22 @@ class NotSupportedCatalogVersion(Exception):
message = property(__str__)
+class CatalogNotFoundException(Exception):
+ pass
+
+
+class CatalogExistException(Exception):
+ pass
+
+
+class PropertyNotFoundException(Exception):
+ pass
+
+
+class MalformedPropertyDefinitionException(Exception):
+ pass
+
+
# ==============================
# Constant class definition
# ==============================
@@ -137,27 +164,6 @@ class Const(object):
raise Exception("Class couldn't be created")
-class CatConst(Const):
- VERSION_TAG = "version"
- STACK_VERSION_OLD = "old-version"
- STACK_VERSION_TARGET = "target-version"
- STACK_STAGS_TAG = "stacks"
- STACK_NAME = "name"
- CONFIG_OPTIONS = "options"
- CONFIG_TYPES = "config-types"
- STACK_PROPERTIES = "properties"
- PROPERTY_VALUE_TAG = "value"
- PROPERTY_REMOVE_TAG = "remove"
- MERGED_COPY_TAG = "merged-copy"
- REQUIRED_SERVICES = "required-services"
- ITEMS_TAG = "items"
- TYPE_TAG = "type"
- TRUE_TAG = "yes"
- STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
- VALUE_TEMPLATE_TAG = "template"
- SEARCH_PATTERN = "(\{[^\{\}]+\})" # {XXXXX}
-
-
class Options(Const):
# action commands
API_PROTOCOL = "http"
@@ -189,8 +195,6 @@ class Options(Const):
COMPONENTS_FORMAT = None
# Curl options
- POST_REQUESTS = ['PUT', 'POST']
- GET_REQUESTS = ['GET', 'DELETE']
CURL_PRINT_ONLY = None
ARGS = None
@@ -237,12 +241,41 @@ class Options(Const):
cls.logger.addHandler(handler)
+class CatConst(Const):
+ VERSION_TAG = "version"
+ STACK_VERSION_OLD = "old-version"
+ STACK_VERSION_TARGET = "target-version"
+ STACK_STAGS_TAG = "stacks"
+ STACK_NAME = "name"
+ CONFIG_OPTIONS = "options"
+ CONFIG_TYPES = "config-types"
+ STACK_PROPERTIES = "properties"
+ STACK_PROPERTIES_ATTRIBUTES = "properties_attributes"
+ PROPERTY_VALUE_TAG = "value"
+ PROPERTY_REMOVE_TAG = "remove"
+ PROPERTY_MAP_TO = "map-to"
+ PROPERTY_FROM_CATALOG = "from-catalog"
+ PROPERTY_TO_CATALOG = "to-catalog"
+ PROPERTY_DEFAULT = "default"
+ MERGED_COPY_TAG = "merged-copy"
+ REQUIRED_SERVICES = "required-services"
+ ITEMS_TAG = "items"
+ TYPE_TAG = "type"
+ TRUE_TAG = "yes"
+ STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
+ VALUE_TEMPLATE_TAG = "template"
+ SEARCH_PATTERN = "(\{[^\{\}]+\})" # {XXXXX}
+ ACTION_COMMIT = "commit"
+ ACTION_RELOAD = "reload"
+ ACTION_RENAME_PROPERTY = "rename-property"
+ TEMPLATE_HANDLER = "template_handler"
+
+
# ==============================
# Catalog classes definition
# ==============================
class UpgradeCatalogFactory(object):
-
- # versions of catalog which is currently supported
+ # versions of catalog which is currently supported
_supported_catalog_versions = ["1.0"]
# private variables
@@ -297,67 +330,53 @@ class UpgradeCatalogFactory(object):
class UpgradeCatalog(object):
-
# private variables
_json_catalog = None
_properties_catalog = None
- _properties_map_catalog = {} # Initially should be assigned empty dictionary as default value
+ _properties_map_catalog = None
_version = None
_search_pattern = None
+ _catalog_options = None
- """
- Substitute handler, should return replaced value, as param would be passed value and tokens to substitute
- Please, be aware! Token should be unique in context of one catalog
-
- Example:
- def _substitute(tokens, value):
- for token in tokens:
- if token == "{REPLACE_ME}":
- value = value.replace(token, "\"hello world\"")
- return value
-
- catalog.set_substitution_handler = _substitute
-
- After that, all properties with CatConst.VALUE_TEMPLATE_TAG set to "yes" would be processed
- """
- _substitution_handler = None
-
- # public variable
- config_groups = None
-
- def __init__(self, catalog=None, version=None, substitution_handler=None):
+ def __init__(self, catalog=None, version=None):
+ self._handlers = {}
self._json_catalog = catalog
self._version = version
self._search_pattern = re.compile(CatConst.SEARCH_PATTERN)
if CatConst.STACK_PROPERTIES in catalog:
- self._properties_catalog = catalog[CatConst.STACK_PROPERTIES]
+ self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES])
if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog:
- self._properties_map_catalog = catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]
+ self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG])
+ else:
+ self._properties_map_catalog = PropertyMapping()
if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \
- and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
-
- self.config_groups = ConfigConst(catalog[CatConst.CONFIG_OPTIONS][CatConst.CONFIG_TYPES],
- properties_catalog=self._properties_catalog)
+ and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
+ self._catalog_options = catalog[CatConst.CONFIG_OPTIONS]
- if substitution_handler is not None:
- self.set_substitution_handler(substitution_handler)
+ def add_handler(self, name, handler):
+ if name not in self._handlers:
+ self._handlers[name] = handler
- # deprecated, used for compatibility with old code
- def get_properties_as_dict(self, properties):
- target_dict = {}
- for key in properties:
- if CatConst.PROPERTY_VALUE_TAG in properties[key] and CatConst.PROPERTY_REMOVE_TAG not in properties[key]:
- target_dict[key] = properties[key][CatConst.PROPERTY_VALUE_TAG]
-
- return target_dict
-
- def set_substitution_handler(self, handler):
- self._substitution_handler = handler
+ def _format_catalog_properties(self, properties):
+ """
+ Transform properties from short form to normal one:
+ "property": "text" => "property": { "value": "text" }
+ :param properties: dict
+ :return: dict
+ """
+ for config_item in properties:
+ cfg_item = properties[config_item]
+ properties[config_item] = dict(zip(
+ cfg_item.keys(),
+ map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values())
+ ))
+ return properties
- def _get_version(self):
+ @property
+ def version(self):
return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET])
def get_parsed_version(self):
@@ -385,84 +404,323 @@ class UpgradeCatalog(object):
return version
- def _get_name(self):
+ @property
+ def name(self):
if CatConst.STACK_NAME in self._json_catalog:
return self._json_catalog[CatConst.STACK_NAME]
return ""
- def _get_propoerty_mapping(self):
+ @property
+ def mapping(self):
return self._properties_map_catalog
- def get_properties(self, config_group):
- if config_group in self._properties_catalog:
- return self._filter_properties(config_group)
- return None
+ @property
+ def items(self):
+ return self._properties_catalog
- def _filter_properties(self, config_group):
- def _property_filter_strings(value):
- if not isinstance(value, dict):
- return {CatConst.PROPERTY_VALUE_TAG: value}
- else:
- if self._substitution_handler is not None and CatConst.VALUE_TEMPLATE_TAG in value \
- and CatConst.VALUE_TEMPLATE_TAG in value: # value contains template
-
- parsed_value = self._substitution_handler(
- self._search_pattern.findall(value[CatConst.PROPERTY_VALUE_TAG]), value[CatConst.PROPERTY_VALUE_TAG]
- )
- if parsed_value is not None: # Check if target function returns result
- value[CatConst.PROPERTY_VALUE_TAG] = parsed_value
-
- return value
- properties = self._properties_catalog[config_group].copy() # pass to process only copy of data
- properties = dict(zip(properties, map(_property_filter_strings, properties.values())))
+ @property
+ def options(self):
+ if CatConst.CONFIG_TYPES in self._catalog_options:
+ return self._catalog_options[CatConst.CONFIG_TYPES]
+ return {}
+
+ def __handle_remove_tag(self, catalog_item_name, catalog_property_item, properties):
+ """
+ :type catalog_item_name str
+ :type catalog_property_item dict
+ :type properties dict
+ """
+ if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \
+ catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \
+ catalog_item_name in properties:
+ del properties[catalog_item_name]
+
+ def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item):
+ """
+ :type catalog_item_name str
+ :type catalog_property_item dict
+ """
+ if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \
+ CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[
+ CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+ parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER](
+ self,
+ self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]),
+ catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+ )
+ catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value
+
+ def __handle_add_new(self, catalog_item_name, catalog_property_item, properties):
+ """
+ :type catalog_item_name str
+ :type catalog_property_item dict
+ :type properties dict
+ """
+ catalog_property_item = dict(catalog_property_item)
+ if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties:
+ self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+ properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+
+ def __handle_change_existing(self, catalog_item_name, catalog_property_item, properties):
+ """
+ :type catalog_item_name str
+ :type catalog_property_item dict
+ :type properties dict
+ """
+ catalog_property_item = dict(catalog_property_item)
+ if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties:
+ self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+ properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
return properties
- version = property(_get_version)
- name = property(_get_name)
- property_map_catalog = property(_get_propoerty_mapping)
+ def __can_handler_execute(self, catalog_options, property_item):
+ """
+ :type catalog_options dict
+ :type property_item dict
+ """
+ can_process = True
+
+ # process required services tag
+ required_list = None
+
+ if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \
+ isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list):
+ required_list = catalog_options[CatConst.REQUIRED_SERVICES]
+
+ if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\
+ isinstance(property_item[CatConst.REQUIRED_SERVICES], list):
+ required_list = property_item[CatConst.REQUIRED_SERVICES]
+ if required_list is not None:
+ can_process = can_process and is_services_exists(required_list)
-class ConfigConst(object):
- _config_types_const_definition = {}
- _config_types_value_definition = {}
+ return can_process
- def __init__(self, config_types_definition, properties_catalog=None):
- if properties_catalog is not None: # compensate possibly undefined config groups in options from property definition
- for item in properties_catalog:
- if item not in config_types_definition:
- config_types_definition[item] = {}
+ def process_simple_transformations(self, name, properties):
+ """
+ :type properties dict
+ :type name str
+ """
+ tag_handlers = [
+ self.__handle_add_new,
+ self.__handle_change_existing,
+ self.__handle_remove_tag
+ ]
+ # catalog has no update entries for this config group
+ if name not in self._properties_catalog:
+ return 0
+
+ catalog_item = self._properties_catalog[name]
+ for catalog_property_item in catalog_item.keys():
+ catalog_options = self.options[name] if name in self.options else {}
+ if self.__can_handler_execute(catalog_options, catalog_item[catalog_property_item]):
+ for handler in tag_handlers:
+ handler(catalog_property_item, catalog_item[catalog_property_item], properties)
+
+
+class PropertyMapping(object):
+ _mapping_list = {}
+
+ def __init__(self, map_list=None):
+ if map_list is not None:
+ self._mapping_list = self._convert_list(map_list)
+
+ def _convert_list(self, map_list):
+ return dict(zip(
+ map_list.keys(),
+ map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values())
+ ))
+
+ def get(self, old_property_name):
+ """
+ Get property mapping dict
+ :old_property_name str
+ :return dict
+ """
+ if old_property_name in self._mapping_list:
+ return self._mapping_list[old_property_name]
- self._config_types_value_definition = config_types_definition
- for key in config_types_definition:
- self._config_types_const_definition[key.replace("-", "_").lower()] = key
+ raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name)
def list(self):
- return self._config_types_value_definition.keys()
+ return self._mapping_list.keys()
+
+ def get_mapped_name(self, old_property_name):
+ if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name):
+ raise MalformedPropertyDefinitionException("%s option is not set for %s property" %
+ (CatConst.PROPERTY_MAP_TO, old_property_name))
+ return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO]
+
+ def exists(self, old_property_name):
+ return old_property_name in self._mapping_list
- def get(self, name):
+
+class ServerConfigFactory(object):
+ _server_catalogs = {}
+
+ def __init__(self):
+ self.__observers = []
+ self._load_configs()
+
+ def subscribe(self, name, config_item):
+ self.__observers.append((name, config_item))
+
+ def _load_configs(self):
+ Options.logger.info('Getting latest cluster configuration from the server...')
+ new_configs = get_config_resp_all()
+ for config_item in new_configs:
+ if config_item in self._server_catalogs:
+ self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item])
+ else:
+ self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item])
+
+ def notify_observers(self, action, arg=None):
+ for name, config_item in self.__observers:
+ if config_item is not None and name in self._server_catalogs:
+ config_item.notify(action, arg)
+
+ def notify_observer(self, _name, action, arg=None):
+ for name, config_item in self.__observers:
+ if config_item is not None and name == _name and name in self._server_catalogs:
+ config_item.notify(action, arg)
+
+ def get_config(self, name):
"""
- Return desired property catalog
- :param name: str
- :return: dict
+ Get configuration item object
+ :type name str
+ :rtype: ServerConfig
"""
- if name in self._config_types_value_definition:
- return self._config_types_value_definition[name]
- raise Exception("No config group with name %s found" % name)
+ if name in self._server_catalogs:
+ return self._server_catalogs[name]
+
+ raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name)
+
+ def create_config(self, name):
+ if name not in self._server_catalogs:
+ self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}})
+ else:
+ raise CatalogExistException("Config group \"%s\" already existed" % name)
+
+ def items(self):
+ return self._server_catalogs.keys()
- def __getattr__(self, item):
+ def reload(self):
+ self._load_configs()
+
+ def process_mapping_transformations(self, catalog):
+ """
+ :type catalog UpgradeCatalog
+ """
+ for map_item in catalog.mapping.list():
+ self._process_single_map_transformation(map_item, catalog.mapping.get(map_item))
+
+ def _process_single_map_transformation(self, map_item_name, map_property_item):
+ """
+ :type map_item_name str
+ :type map_property_item dict
+ """
+ new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO]
+ source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\
+ map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None
+ target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \
+ map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None
+ default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \
+ map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None
+ required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None
+
+ # process required-services tag
+ if required_services is not None and not is_services_exists(required_services):
+ return 0
+
+ if source_cfg_group is None and target_cfg_group is None: # global scope mapping renaming
+ self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [map_item_name, new_property_name])
+ elif source_cfg_group is not None and target_cfg_group is not None: # group-to-group moving
+ if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs:
+ old_cfg_group = self.get_config(source_cfg_group).properties
+ new_cfg_group = self.get_config(target_cfg_group).properties
+
+ if map_item_name in old_cfg_group:
+ new_cfg_group[new_property_name] = old_cfg_group[map_item_name]
+ del old_cfg_group[map_item_name]
+ elif map_item_name not in old_cfg_group and default_value is not None:
+ new_cfg_group[new_property_name] = default_value
+
+ def commit(self):
+ self.notify_observers(CatConst.ACTION_COMMIT)
+
+
+class ServerConfig(object):
+ def __init__(self, factory, name, initial_configs):
"""
- Support for constant handling like "<name>_tag" which would return real config name.
- Base list loaded from section options\config-types of json.
+ Initialize configuration item
+ :factory ServerConfigFactory
+ """
+ factory.subscribe(name, self)
+ self._configs = initial_configs
+ self._hash = self._calculate_hash()
+ self._name = name
+
+ def _calculate_hash(self):
+ return hash(str(self._configs))
+
+ def notify(self, action, arg=None):
+ if action == CatConst.ACTION_RELOAD:
+ self._configs = arg
+ self._hash = self._calculate_hash()
+ elif action == CatConst.ACTION_COMMIT:
+ self._commit()
+ elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 2:
+ self._rename_property(arg[0], arg[1])
+
+ def _rename_property(self, old_name, new_name):
+ if old_name in self.properties:
+ old_property_value = self.properties[old_name]
+ self.properties[new_name] = old_property_value
+ del self.properties[old_name]
+
+ def is_attributes_exists(self):
+ return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs
+
+ @property
+ def properties(self):
+ return self._configs[CatConst.STACK_PROPERTIES]
+
+ @properties.setter
+ def properties(self, value):
+ self._configs[CatConst.STACK_PROPERTIES] = value
+
+ @property
+ def attributes(self):
+ return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+ @attributes.setter
+ def attributes(self, value):
+ self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value
+
+ def _commit(self):
+ if self._hash != self._calculate_hash():
+ Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name)
+ if self.is_attributes_exists():
+ update_config(self.properties, self._name, self.attributes)
+ else:
+ update_config(self.properties, self._name)
- Example:
- self.hbase_env_tag will return hbase-env
+ def clear(self):
+ self.properties = {}
+ self.attributes = {}
- :param item: accessed attribute
- :return: attribute value if exists or None
+ def merge(self, catalog_item):
+ """
+ :type catalog_item UpgradeCatalog
"""
- item = item.lower()
- if "_tag" in item and item[:-4] in self._config_types_const_definition:
- return self._config_types_const_definition[item[:-4]]
+ # handle "merged-copy" tag
+ config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {}
+ clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and
+ config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG)
+ if clear_properties:
+ self.clear()
+ Options.logger.info("Processing configuration group: %s", self._name)
+ catalog_item.process_simple_transformations(self._name, self.properties)
def write_mapping(hostmapping):
@@ -471,13 +729,6 @@ def write_mapping(hostmapping):
json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w'))
-def write_config(config, cfg_type, tag):
- file_name = cfg_type + "_" + tag
- if os.path.isfile(file_name):
- os.remove(file_name)
- json.dump(config, open(file_name, 'w'))
-
-
def read_mapping():
if os.path.isfile(Options.MR_MAPPING_FILE):
if Options.MR_MAPPING is not None:
@@ -497,7 +748,7 @@ def get_mr1_mapping():
hostmapping = {}
for component in components:
hostlist = []
- structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True, validate_expect_body=True)
+ structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True)
if 'host_components' in structured_resp:
for hostcomponent in structured_resp['host_components']:
@@ -552,15 +803,15 @@ def delete_mr():
if (key in NON_CLIENTS) and (len(value) > 0):
for host in value:
curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED,
- validate=True, validate_expect_body=False)
+ validate=True)
- curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True, validate_expect_body=False)
+ curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True)
def get_cluster_stackname():
VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version'
- structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, validate_expect_body=True, parse=True)
+ structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, parse=True)
if 'Clusters' in structured_resp:
if 'version' in structured_resp['Clusters']:
@@ -574,8 +825,8 @@ def has_component_in_stack_def(stack_name, service_name, component_name):
stack, stack_version = stack_name.split('-')
try:
- curl(STACK_COMPONENT_URL_FORMAT.format(stack,stack_version, service_name, component_name),
- validate=True, validate_expect_body=True, simulate=False)
+ curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name),
+ validate=True, simulate=False)
return True
except FatalException:
return False
@@ -606,15 +857,15 @@ def add_services():
hostmapping = read_mapping()
for service in service_comp.keys():
- curl(SERVICE_URL_FORMAT.format(service), validate=True, validate_expect_body=False, request_type="POST")
+ curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST")
for component in service_comp[service]:
curl(COMPONENT_URL_FORMAT.format(service, component),
- validate=True, validate_expect_body=False, request_type="POST")
+ validate=True, request_type="POST")
for host in hostmapping[new_old_host_map[component]]:
curl(HOST_COMPONENT_URL_FORMAT.format(host, component),
- validate=True, validate_expect_body=False, request_type="POST")
+ validate=True, request_type="POST")
def update_config(properties, config_type, attributes=None):
@@ -625,83 +876,14 @@ def update_config(properties, config_type, attributes=None):
expect_body = config_type != "cluster-env" # ToDo: make exceptions more flexible
- curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True,
- validate_expect_body=expect_body, soft_validation=True)
-
-
-def get_zookeeper_quorum():
- zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
- zoo_quorum = []
- zoo_def_port = "2181"
- if "host_components" in zoo_cfg:
- for item in zoo_cfg["host_components"]:
- zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
-
- return ",".join(zoo_quorum)
-
-
-def get_config(cfg_type):
- tag, structured_resp = get_config_resp(cfg_type)
- properties = None
- properties_attributes = None
-
- if 'items' in structured_resp:
- for item in structured_resp['items']:
- if (tag == item['tag']) or (cfg_type == item['type']):
- if 'properties' in item:
- properties = item['properties']
- if 'properties_attributes' in item:
- properties_attributes = item['properties_attributes']
- break
- if properties is None:
- raise FatalException(-1, "Unable to read configuration for type " + cfg_type + " and tag " + tag)
-
- return properties, properties_attributes
-
-
-def parse_config_resp(resp):
- parsed_configs = []
- if CatConst.ITEMS_TAG in resp:
- for config_item in resp[CatConst.ITEMS_TAG]:
- parsed_configs.append({
- "type": config_item[CatConst.TYPE_TAG],
- "properties": config_item[CatConst.STACK_PROPERTIES]
- })
- return parsed_configs
-
-
-def get_config_resp(cfg_type, error_if_na=True, parsed=False, tag=None):
- CONFIG_URL_FORMAT = Options.CLUSTER_URL + '/configurations?type={0}&tag={1}'
-
- # Read the config version
- if tag is None:
- structured_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
-
- if 'Clusters' in structured_resp:
- if 'desired_configs' in structured_resp['Clusters']:
- if cfg_type in structured_resp['Clusters']['desired_configs']:
- tag = structured_resp['Clusters']['desired_configs'][cfg_type]['tag']
-
- if tag is not None:
- # Get the config with the tag and return properties
- structured_resp = curl(CONFIG_URL_FORMAT.format(cfg_type, tag), parse=True, simulate=False,
- validate=True, validate_expect_body=True)
- if parsed:
- return tag, parse_config_resp(structured_resp)
- else:
- return tag, structured_resp
- else:
- if error_if_na:
- raise FatalException(-1, "Unable to get the current version for config type " + cfg_type)
- else:
- return tag, None
+ curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True)
def get_config_resp_all():
desired_configs = {}
- CONFIG_ALL_PROPERTIES_URL = Options.CLUSTER_URL + "/configurations?fields=properties"
- desired_configs_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
- all_options = curl(CONFIG_ALL_PROPERTIES_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+ config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties,properties_attributes"
+ desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True, simulate=False)
+ all_options = curl(config_all_properties_url, validate=True, parse=True, simulate=False)
if 'Clusters' in desired_configs_resp:
if 'desired_configs' in desired_configs_resp['Clusters']:
@@ -712,20 +894,35 @@ def get_config_resp_all():
return None
if CatConst.ITEMS_TAG in all_options:
- all_options = all_options["items"]
+ all_options = all_options[CatConst.ITEMS_TAG]
else:
return None
all_options = filter(
- lambda x: x["type"] in desired_configs_resp and x["tag"] == desired_configs_resp[x["type"]]["tag"],
+ lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][
+ "tag"],
all_options)
for item in all_options:
- if CatConst.STACK_PROPERTIES in item: # config item could not contain anu property
- desired_configs[item["type"]] = item["properties"]
+ dc_item = {}
+
+ if CatConst.STACK_PROPERTIES in item: # config item could not contain any property
+ dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES]
+ else:
+ dc_item[CatConst.STACK_PROPERTIES] = {}
+
+ if CatConst.STACK_PROPERTIES_ATTRIBUTES in item:
+ dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+ if "tag" in item:
+ dc_item["tag"] = item["tag"]
+
+ if dc_item != {}:
+ desired_configs[item[CatConst.TYPE_TAG]] = dc_item
return desired_configs
+
def is_services_exists(required_services):
"""
return true, if required_services is a part of Options.SERVICES
@@ -738,6 +935,7 @@ def is_services_exists(required_services):
return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES
+
def get_cluster_services():
services_url = Options.CLUSTER_URL + '/services'
raw_services = curl(services_url, parse=True, simulate=False)
@@ -750,92 +948,73 @@ def get_cluster_services():
Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work")
return []
-def filter_properties_by_service_presence(config_type, catalog, catalog_properties):
+
+def get_zookeeper_quorum():
+ zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
+ zoo_quorum = []
+ zoo_def_port = "2181"
+ if "host_components" in zoo_cfg:
+ for item in zoo_cfg["host_components"]:
+ zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
+
+ return ",".join(zoo_quorum)
+
+
+def get_jt_host(catalog):
"""
- Filter properties by required-services tag.
- required-services tag could be catalog to per-property defined. per-property definition
- will always override per-catalog definition.
- :param config_type: str
- :param catalog: UpgradeCatalog
- :param catalog_properties: dict
- :return: dict
+ :type catalog: UpgradeCatalog
+ :rtype str
"""
- cproperties = dict(catalog_properties)
- catalog_required_services = []
- del_props = []
-
- # do nothing
- if CatConst.REQUIRED_SERVICES in catalog.config_groups.get(config_type) and \
- isinstance(catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES], list):
- catalog_required_services = catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES]
-
- for prop_name in cproperties:
- # set per catalog limitation
- required_services = catalog_required_services
- if CatConst.REQUIRED_SERVICES in cproperties[prop_name] and\
- isinstance(cproperties[prop_name][CatConst.REQUIRED_SERVICES], list):
- # set per property limitation
- required_services = catalog_properties[prop_name][CatConst.REQUIRED_SERVICES]
-
- # add property to list for remove
- if not is_services_exists(required_services):
- del_props.append(prop_name)
-
- # remove properties
- for prop in del_props:
- del cproperties[prop]
-
- return cproperties
-
-def modify_config_item(config_type, catalog):
- # here should be declared tokens for pattern replace
-
- if catalog.get_parsed_version()["from"] == 13: # ToDo: introduce class for pre-defined tokens
- hostmapping = read_mapping()
- jt_host = hostmapping["JOBTRACKER"][0]
- jh_host = hostmapping["HISTORYSERVER"][0]
- else:
- jt_host = ""
- jh_host = ""
-
- def _substitute(tokens, value):
- for token in tokens:
- if token == "{JOBHISTORY_HOST}":
- value = value.replace(token, jh_host)
- elif token == "{RESOURCEMANAGER_HOST}":
- value = value.replace(token, jt_host)
- elif token == "{ZOOKEEPER_QUORUM}":
- value = value.replace(token, get_zookeeper_quorum())
- return value
- # Exit from function if was passed not suitable parameters
- catalog.set_substitution_handler(_substitute)
+ if catalog.get_parsed_version()["from"] == 13:
+ return read_mapping()["JOBTRACKER"][0]
- try:
- properties_latest, properties_attributes_latest = get_config(config_type)
- properties_latest = rename_all_properties(properties_latest, catalog.property_map_catalog)
- except Exception as e:
- properties_latest = {}
- properties_attributes_latest = None
-
- properties_copy = catalog.get_properties(config_type)
- is_merged_copy = CatConst.MERGED_COPY_TAG in catalog.config_groups.get(config_type) \
- and catalog.config_groups.get(config_type)[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG
-
- # filter properties by service-required tag
- properties_copy = filter_properties_by_service_presence(config_type, catalog, properties_copy)
-
- # ToDo: implement property transfer from one catalog to other
- # properties_to_move = [
- # "dfs.namenode.checkpoint.edits.dir",
- # "dfs.namenode.checkpoint.dir",
- # "dfs.namenode.checkpoint.period"]
- Options.logger.info("Updating '%s' catalog item..." % config_type)
- if is_merged_copy: # Append configs to existed ones
- tag, structured_resp = get_config_resp(config_type, False)
- if structured_resp is not None:
- update_config_using_existing_properties(config_type, properties_copy, properties_latest, properties_attributes_latest, catalog)
- else: # Rewrite/create config items
- update_config(catalog.get_properties_as_dict(properties_copy), config_type)
+ return ""
+
+
+def get_jh_host(catalog):
+ """
+ :type catalog: UpgradeCatalog
+ :rtype str
+ """
+ if catalog.get_parsed_version()["from"] == 13:
+ return read_mapping()["HISTORYSERVER"][0]
+
+ return ""
+
+
+def _substitute_handler(upgrade_catalog, tokens, value):
+ """
+ Substitute handler
+ :param upgrade_catalog: UpgradeCatalog
+ :param tokens: list
+ :param value: str
+ :rtype str
+ """
+ for token in tokens:
+ if token == "{JOBHISTORY_HOST}":
+ value = value.replace(token, get_jh_host(upgrade_catalog))
+ elif token == "{RESOURCEMANAGER_HOST}":
+ value = value.replace(token, get_jt_host(upgrade_catalog))
+ elif token == "{ZOOKEEPER_QUORUM}":
+ value = value.replace(token, get_zookeeper_quorum())
+ return value
+
+
+def modify_config_item(config_type, catalog, server_config_factory):
+ """
+ Modify configuration item
+ :type config_type str
+ :type catalog UpgradeCatalog
+ :type server_config_factory ServerConfigFactory
+ """
+
+ # if config group is absent on the server, we will create it
+ if config_type not in server_config_factory.items():
+ server_config_factory.create_config(config_type)
+
+ server_config_catalog = server_config_factory.get_config(config_type)
+
+ server_config_catalog.merge(catalog)
def modify_configs():
@@ -845,77 +1024,59 @@ def modify_configs():
config_type = None
catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog
- catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack) # get desired version of catalog
+ catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+ Options.OPTIONS.to_stack) # get desired version of catalog
+
+ # load all desired configs from the server
+ server_config_factory = ServerConfigFactory()
if catalog is None:
raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified"
% (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
- if config_type is not None and config_type not in catalog.config_groups.list():
+ # add user-defined template processing function
+ catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler)
+
+ if config_type is not None and config_type not in catalog.items:
raise FatalException("Config type %s not exists, no configs was modified" % config_type)
if config_type is not None:
- modify_config_item(config_type, catalog)
+ modify_config_item(config_type, catalog, server_config_factory)
else:
- for collection_name in catalog.config_groups.list():
- modify_config_item(collection_name, catalog)
-
-
-def rename_all_properties(properties, name_mapping):
- for key, val in name_mapping.items():
- if (key in properties.keys()) and (val not in properties.keys()):
- properties[val] = properties[key]
- del properties[key]
- return properties
-
-
-# properties template - passed as dict from UpgradeCatalog
-def update_config_using_existing_properties(conf_type, properties_template,
- site_properties, properties_attributes_latest, catalog):
- keys_processed = []
- keys_to_delete = []
- properties_parsed = catalog.get_properties_as_dict(properties_template)
-
- for key in properties_template.keys():
- keys_processed.append(key)
- if CatConst.PROPERTY_REMOVE_TAG in properties_template and properties_template[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
- keys_to_delete.append(key)
-
- for key in site_properties.keys():
- if key not in keys_processed:
- properties_parsed[key] = site_properties[key]
-
- for key in keys_to_delete:
- del properties_parsed[key]
-
- # check property attributes list
- if properties_attributes_latest is not None:
- for key in properties_attributes_latest:
- properties_attributes_latest[key] = dict(filter(
- lambda (item_key, item_value): item_key not in keys_to_delete,
- zip(properties_attributes_latest[key].keys(), properties_attributes_latest[key].values())
- ))
+ for collection_name in catalog.items:
+ modify_config_item(collection_name, catalog, server_config_factory)
+
+ server_config_factory.process_mapping_transformations(catalog)
- update_config(properties_parsed, conf_type, attributes=properties_attributes_latest)
+ # commit changes to server, if any will be found
+ server_config_factory.commit()
def backup_configs(conf_type=None):
- DESIRED_CONFIGS_URL = Options.CLUSTER_URL + "?fields=Clusters/desired_configs"
+ dir = "backups_%d" % time.time()
+ file_pattern = "%s%s%s_%s.json"
+ configs = get_config_resp_all()
+ if configs is None:
+ Options.logger.error("Unexpected response from the server")
+ return -1
- desired_configs = curl(DESIRED_CONFIGS_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+ if conf_type is not None and conf_type in configs:
+ configs = {conf_type: configs[conf_type]}
- if "Clusters" in desired_configs and "desired_configs" in desired_configs["Clusters"]:
- for conf_type in desired_configs["Clusters"]["desired_configs"].keys():
- backup_single_config_type(conf_type, True)
+ if not os.path.exists(dir):
+ os.mkdir(dir)
+ for item in configs:
+ filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"])
+ if os.path.exists(filename):
+ os.remove(filename)
-def backup_single_config_type(conf_type, error_if_na=True):
- tag, response = get_config_resp(conf_type, error_if_na)
- if response is not None:
- Options.logger.info("Saving config for type: " + conf_type + " and tag: " + tag)
- write_config(response, conf_type, tag)
- else:
- Options.logger.info("Unable to obtain config for type: " + conf_type)
+ try:
+ with open(filename, "w") as f:
+ f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4))
+ Options.logger.info("Catalog \"%s\" stored to %s", item, filename)
+ except IOError as e:
+ Options.logger.error("Unable to store \"%s\": %s", item, e)
def install_services():
@@ -948,94 +1109,91 @@ def install_services():
err_message = ""
for index in [0, 1]:
try:
- curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True,
- validate_expect_body=not Options.OPTIONS.printonly, request_type="PUT", data=PUT_IN_INSTALLED[index])
+ curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index])
except FatalException as e:
if not e.code == 0:
err_retcode = e.code
err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "."
if err_retcode != 0:
- raise FatalException(err_retcode, err_message + "(Services may already be installed or agents are not yet started.)")
+ raise FatalException(err_retcode,
+ err_message + "(Services may already be installed or agents are not yet started.)")
Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \
- "the status of the install requests."
+ "the status of the install requests."
-def validate_response(response, expect_body):
- if expect_body:
- if "\"href\" : \"" not in response:
- return 1, response
- else:
- return 0, ""
- elif len(response) > 0:
- return 1, response
- else:
- return 0, ""
+def generate_auth_header(user, password):
+ token = "%s:%s" % (user, password)
+ token = base64.encodestring(token)
+ return {"Authorization": "Basic %s" % token.replace('\n', '')}
def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False,
- simulate=None, validate=False, validate_expect_body=False, soft_validation=False):
+ simulate=None, validate=False, soft_validation=False):
+ _headers = {}
+ handler_chain = []
+ post_req = ["POST", "PUT"]
+ get_req = ["GET", "DELETE"]
simulate_only = Options.CURL_PRINT_ONLY is not None or (simulate is not None and simulate is True)
print_url = Options.CURL_PRINT_ONLY is not None and simulate is not None
+ if request_type not in post_req + get_req:
+ raise IOError("Wrong request type \"%s\" passed" % request_type)
- curl_path = '/usr/bin/curl'
- curl_list = [curl_path]
-
- curl_list.append('-X')
- curl_list.append(request_type)
+ if data is not None and isinstance(data, dict):
+ data = json.dumps(data)
if tokens is not None:
- curl_list.append('-u')
- curl_list.append("%s:%s" % (tokens["user"], tokens["pass"]))
+ _headers.update(generate_auth_header(tokens["user"], tokens["pass"]))
elif Options.API_TOKENS is not None:
- curl_list.append('-u')
- curl_list.append("%s:%s" % (Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
-
- if request_type in Options.POST_REQUESTS:
- curl_list.append(url)
+ _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
- if headers is None and Options.HEADERS is not None:
- headers = Options.HEADERS
+ if request_type in post_req and data is not None:
+ _headers["Content-Length"] = len(data)
if headers is not None:
- for header in headers:
- curl_list.append('-H')
- curl_list.append("%s: %s" % (header, headers[header]))
+ _headers.update(headers)
- if data is not None and request_type in Options.POST_REQUESTS:
- curl_list.append('--data')
- curl_list.append(json.dumps(data))
+ if Options.HEADERS is not None:
+ _headers.update(Options.HEADERS)
- if request_type in Options.GET_REQUESTS:
- curl_list.append(url)
+ director = build_opener(*handler_chain)
+ if request_type in post_req:
+ _data = bytes(data)
+ req = Request(url, headers=_headers, data=_data)
+ else:
+ req = Request(url, headers=_headers)
+
+ req.get_method = lambda: request_type
if print_url:
- Options.logger.info(" ".join(curl_list))
+ Options.logger.info(url)
+ code = 200
if not simulate_only:
- osStat = subprocess.Popen(
- curl_list,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
- out, err = osStat.communicate()
- if 0 != osStat.returncode:
- error = "curl call failed. out: " + out + " err: " + err
- Options.logger.error(error)
- raise FatalException(osStat.returncode, error)
+ try:
+ resp = director.open(req)
+ out = resp.read()
+ if isinstance(out, bytes):
+ out = out.decode("utf-8")
+ code = resp.code
+ except URLError as e:
+ Options.logger.error(str(e))
+ if isinstance(e, HTTPError):
+ raise e
+ else:
+ raise FatalException(-1, str(e))
else:
if not print_url:
- Options.logger.info(" ".join(curl_list))
+ Options.logger.info(url)
out = "{}"
- if validate and not simulate_only:
- retcode, errdata = validate_response(out, validate_expect_body)
- if not retcode == 0:
- if soft_validation:
- Options.logger.warning("Response validation failed, please check previous action result manually.")
- else:
- raise FatalException(retcode, errdata)
+ if validate and not simulate_only and (code > 299 or code < 200):
+ if soft_validation:
+ Options.logger.warning("Response validation failed, please check previous action result manually.")
+ else:
+ raise FatalException(code, "Response validation failed, please check previous action result manually.")
if parse:
return json.loads(out)
@@ -1055,15 +1213,13 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
}
:param collection_name:
:param catalog:
+ :param actual_properties_list
:return:
"""
verified_catalog = []
- catalog_properties = catalog.get_properties(collection_name)
- actual_properties = None
-
- if collection_name in actual_properties_list:
- actual_properties = actual_properties_list[collection_name]
+ catalog_properties = dict(catalog)
+ actual_properties = dict(actual_properties_list)
if actual_properties is None:
verified_catalog = map(lambda x: {
@@ -1085,7 +1241,8 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
verified_catalog_catalog = map(lambda x: {
"property": x,
"catalog_item": catalog_properties[x],
- "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in catalog_properties[x] else None,
+ "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in
+ catalog_properties[x] else None,
"actual_value": actual_properties[x] if x in actual_properties else None,
}, catalog_properties.keys())
@@ -1114,18 +1271,18 @@ def configuration_diff_analyze(diff_list):
# process properties which can be absent
# item was removed, from actual configs according to catalog instructions
- if property_item["actual_value"] is None and property_item["catalog_value"] is None \
- and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
- and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
+ if property_item["actual_value"] is None \
+ and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
+ and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
push_status("ok", property_item)
- # currently skip values with template tag, as there no filter implemented
- # ToDo: implement possibility to filter values without filter handler,
- # ToDo: currently filtering is possible only on update-configs stage
+ # currently skip values with template tag, as there no filter implemented
+ # ToDo: implement possibility to filter values without filter handler,
+ # ToDo: currently filtering is possible only on update-configs stage
elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \
- and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
- and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+ and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
+ and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
push_status("skipped", property_item)
@@ -1156,23 +1313,24 @@ def verify_configuration():
config_type = None
catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog
- catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack) # get desired version of catalog
+ catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+ Options.OPTIONS.to_stack) # get desired version of catalog
+ server_configs = ServerConfigFactory()
if catalog is None:
raise FatalException(1, "Upgrade catalog for version %s-%s not found"
% (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
- if config_type is not None and config_type not in catalog.config_groups.list():
+ if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items():
raise FatalException("Config type %s not exists" % config_type)
# fetch from server all option at one time and filter only desired versions
- actual_options = get_config_resp_all()
if config_type is not None:
- diff_list[config_type] = configuration_item_diff(config_type, catalog, actual_options)
+ diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties)
else:
- for collection_name in catalog.config_groups.list():
- diff_list[collection_name] = configuration_item_diff(collection_name, catalog, actual_options)
+ for collection_name in catalog.items.keys():
+ diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties)
analyzed_list = configuration_diff_analyze(diff_list)
@@ -1187,7 +1345,7 @@ def verify_configuration():
if analyzed_list[config_item]["fail"]["count"] != 0:
Options.logger.info(
"%s: %s missing configuration(s) - please look in the output file for the missing params" % (
- config_item, analyzed_list[config_item]["fail"]["count"]
+ config_item, analyzed_list[config_item]["fail"]["count"]
)
)
if report_file is not None:
@@ -1206,16 +1364,12 @@ def report_formatter(report_file, config_item, analyzed_list_item):
prefix = "Configuration item %s" % config_item
if analyzed_list_item["fail"]["count"] > 0:
for item in analyzed_list_item["fail"]["items"]:
- report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"" % (
+ report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % (
prefix, item["property"], item["actual_value"], item["catalog_value"]
))
-#
-# Main.
-#
def main():
-
action_list = { # list of supported actions
Options.GET_MR_MAPPING_ACTION: get_mr1_mapping,
Options.DELETE_MR_ACTION: delete_mr,
@@ -1224,7 +1378,7 @@ def main():
Options.INSTALL_YARN_MR2_ACTION: install_services,
Options.BACKUP_CONFIG_ACTION: backup_configs,
Options.VERIFY_ACTION: verify_configuration
- }
+ }
parser = optparse.OptionParser(usage="usage: %prog [options] action\n Valid actions: "
+ ", ".join(action_list.keys())
http://git-wip-us.apache.org/repos/asf/ambari/blob/1e158101/ambari-server/src/test/python/TestUpgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestUpgradeHelper.py b/ambari-server/src/test/python/TestUpgradeHelper.py
index 1624dd2..73b1e4e 100644
--- a/ambari-server/src/test/python/TestUpgradeHelper.py
+++ b/ambari-server/src/test/python/TestUpgradeHelper.py
@@ -48,6 +48,7 @@ class TestUpgradeHelper(TestCase):
catalog_to = "2.2"
catalog_cfg_type = "my type"
required_service = "TEST"
+ curl_response = "{}"
test_catalog = """{
"version": "1.0",
"stacks": [
@@ -95,19 +96,12 @@ class TestUpgradeHelper(TestCase):
sys.stdout = self.out
def magic_curl(self, *args, **kwargs):
+ resp = self.curl_response
+ self.curl_response = "{}"
+ if "parse" in kwargs and isinstance(resp, str) and kwargs["parse"] == True:
+ resp = json.loads(resp)
+ return resp
- def ret_object():
- return ""
-
- def communicate():
- return "{}", ""
-
- ret_object.returncode = 0
- ret_object.communicate = communicate
-
- with patch("upgradeHelper.subprocess") as subprocess:
- subprocess.Popen.return_value = ret_object
- self.original_curl(*args, **kwargs)
def tearDown(self):
sys.stdout = sys.__stdout__
@@ -155,19 +149,6 @@ class TestUpgradeHelper(TestCase):
self.assertEqual(True, actual_result)
self.assertEqual(True, actual_result_1)
- @patch.object(upgradeHelper, "is_services_exists")
- def test_filter_properties_by_service_presence(self, is_service_exists_mock):
- catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
- catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
- cfg_type = self.catalog_cfg_type
- is_service_exists_mock.return_value = True
-
- old_services = upgradeHelper.Options.SERVICES
- upgradeHelper.Options.SERVICES = set([self.required_service])
- actual_result = upgradeHelper.filter_properties_by_service_presence(cfg_type, catalog, catalog.get_properties(self.catalog_cfg_type))
-
- upgradeHelper.Options.SERVICES = old_services
- self.assertEqual(catalog.get_properties(self.catalog_cfg_type), actual_result)
@patch("__builtin__.open")
@patch.object(os.path, "isfile")
@@ -193,34 +174,6 @@ class TestUpgradeHelper(TestCase):
@patch("__builtin__.open")
@patch.object(os.path, "isfile")
- @patch("os.remove")
- def test_write_config(self, remove_mock, isfile_mock, open_mock):
- test_data = {
- "test_field": "test_value"
- }
- test_result = json.dumps(test_data)
- test_cfgtype = 'cfg.me'
- test_tag = 'tag.test'
- test_filename = "%s_%s" % (test_cfgtype, test_tag)
- test_result = json.dumps(test_data)
- output = StringIO()
- isfile_mock.return_value = True
- open_mock.return_value = output
-
- # execute testing function
- upgradeHelper.write_config(test_data, test_cfgtype, test_tag)
-
- self.assertEquals(1, isfile_mock.call_count)
- self.assertEquals(1, remove_mock.call_count)
- self.assertEquals(1, open_mock.call_count)
-
- # check file name
- self.assertEquals(test_filename, isfile_mock.call_args[0][0])
- # check content
- self.assertEquals(test_result, output.getvalue())
-
- @patch("__builtin__.open")
- @patch.object(os.path, "isfile")
def test_read_mapping(self, isfile_mock, open_mock):
test_data = {
"test_field": "test_value"
@@ -353,8 +306,7 @@ class TestUpgradeHelper(TestCase):
{
"request_type": "PUT",
"data": PUT_IN_DISABLED,
- "validate": True,
- "validate_expect_body": False
+ "validate": True
}
]
)
@@ -364,8 +316,7 @@ class TestUpgradeHelper(TestCase):
(SERVICE_URL_FORMAT,),
{
"request_type": "DELETE",
- "validate": True,
- "validate_expect_body": False
+ "validate": True
}
]
)
@@ -439,7 +390,6 @@ class TestUpgradeHelper(TestCase):
(SERVICE_URL_FORMAT.format(service),),
{
"validate": True,
- "validate_expect_body": False,
"request_type": "POST"
}
])
@@ -448,7 +398,6 @@ class TestUpgradeHelper(TestCase):
(COMPONENT_URL_FORMAT.format(service, component),),
{
"validate": True,
- "validate_expect_body": False,
"request_type": "POST"
}
])
@@ -457,7 +406,6 @@ class TestUpgradeHelper(TestCase):
(HOST_COMPONENT_URL_FORMAT.format(host, component),),
{
"validate": True,
- "validate_expect_body": False,
"request_type": "POST"
}
])
@@ -488,8 +436,7 @@ class TestUpgradeHelper(TestCase):
"request_type": "PUT",
"data": copy.deepcopy(properties_payload),
"validate": True,
- "soft_validation": True,
- "validate_expect_body": True
+ "soft_validation": True
}
)
@@ -500,8 +447,7 @@ class TestUpgradeHelper(TestCase):
"request_type": "PUT",
"data": copy.deepcopy(properties_payload),
"validate": True,
- "soft_validation": True,
- "validate_expect_body": True
+ "soft_validation": True
}
)
@@ -547,86 +493,6 @@ class TestUpgradeHelper(TestCase):
self.assertEqual(expected_result, actual_result)
- @patch.object(upgradeHelper, "get_config_resp")
- def test_get_config(self, get_config_resp_mock):
- config_type = "test type"
- tag_name = "my tag"
- properties = {
- "my property": "property value"
- }
- property_atributes = {
- "myproperty": "property value"
- }
-
- in_data = tag_name, {
- "items": [
- {
- "tag": tag_name,
- "type": config_type,
- "properties": properties,
- "properties_attributes": property_atributes
- }
- ]
- }
-
- get_config_resp_mock.return_value = in_data
- expected_data = (properties, property_atributes)
-
- # execute testing function
- actual_data = upgradeHelper.get_config(config_type)
-
- self.assertEqual(expected_data, actual_data)
-
- def test_parse_config_resp(self):
- cfg_type = "type 1"
- cfg_properties = {
- "my property": "property value"
- }
-
- in_data = {
- upgradeHelper.CatConst.ITEMS_TAG: [
- {
- upgradeHelper.CatConst.TYPE_TAG: cfg_type,
- upgradeHelper.CatConst.STACK_PROPERTIES: cfg_properties
- }
- ]
- }
-
- expected_result = [{
- "type": cfg_type,
- "properties": cfg_properties
- }]
-
- # execute testing function
- actual_result = upgradeHelper.parse_config_resp(in_data)
-
- self.assertEqual(expected_result, actual_result)
-
- @patch.object(upgradeHelper, "curl")
- @patch.object(upgradeHelper, "parse_config_resp")
- def test_get_config_resp(self, parse_config_resp_mock, curl_mock):
- cfg_type = "my type"
- cfg_tag = "my tag"
- cfg_data = "test"
- curl_responses = [
- {
- 'Clusters': {
- 'desired_configs': {
- cfg_type: {
- "tag": cfg_tag
- }
- }
- }
- },
- cfg_data
- ]
- curl_mock.side_effect = MagicMock(side_effect=curl_responses)
-
- # execute testing function
- actual_tag, actual_data = upgradeHelper.get_config_resp(cfg_type, False, False)
-
- self.assertEqual((cfg_tag, cfg_data), (actual_tag, actual_data))
-
@patch.object(upgradeHelper, "curl")
def test_get_config_resp_all(self, curl_mock):
cfg_type = "my type"
@@ -656,8 +522,11 @@ class TestUpgradeHelper(TestCase):
]
expected_result = {
- cfg_type: cfg_properties
- }
+ cfg_type: {
+ "properties": cfg_properties,
+ "tag": cfg_tag
+ }
+ }
curl_mock.side_effect = MagicMock(side_effect=curl_resp)
# execute testing function
@@ -666,160 +535,31 @@ class TestUpgradeHelper(TestCase):
self.assertEquals(expected_result, actual_result)
pass
- @patch.object(upgradeHelper, "read_mapping")
- @patch.object(upgradeHelper, "get_config")
- @patch.object(upgradeHelper, "update_config_using_existing_properties")
- @patch.object(upgradeHelper, "update_config")
- @patch.object(upgradeHelper, "get_config_resp")
- def test_modify_config_item(self, get_config_resp_mock, upgrade_config_mock, update_config_using_existing_properties_mock,
- get_config_mock, read_mapping_mock):
- catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
- get_config_resp_mock.return_value = "", {}
- old_services = upgradeHelper.Options.SERVICES
- upgradeHelper.Options.SERVICES = set([self.required_service])
- catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
- cfg_type = self.catalog_cfg_type
- read_mapping_mock.return_value = {
- "MAPREDUCE_CLIENT": ["test.host.vm"],
- "JOBTRACKER": ["test1.host.vm"],
- "TASKTRACKER": ["test2.host.vm"],
- "HISTORYSERVER": ["test3.host.vm"]
- }
- get_config_mock.return_value = {"my replace property": "property value 2"}, {}
- expected_params = [
- cfg_type,
- {
- "my property": {
- "value": "my value",
- "required-services": ["TEST"]
- }
- },
- {
- "my property 2": "property value 2"
- },
- ]
-
- # execute testing function
- upgradeHelper.modify_config_item(cfg_type, catalog)
-
- upgradeHelper.Options.SERVICES = old_services
-
- actual_params = [
- update_config_using_existing_properties_mock.call_args[0][0],
- update_config_using_existing_properties_mock.call_args[0][1],
- update_config_using_existing_properties_mock.call_args[0][2]
- ]
- self.assertEquals(update_config_using_existing_properties_mock.call_count, 1)
- self.assertEqual(upgrade_config_mock.call_count, 0)
-
- self.assertEqual(expected_params, actual_params)
-
- @patch.object(upgradeHelper, "UpgradeCatalogFactory", autospec=True)
- @patch.object(upgradeHelper, "modify_config_item")
- def test_modify_configs(self, modify_config_item_mock, factory_mock):
- factory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
- options = lambda: ""
- options.from_stack = self.catalog_from
- options.to_stack = self.catalog_to
- options.upgrade_json = ""
-
- upgradeHelper.Options.OPTIONS = options
-
- # execute testing function
- upgradeHelper.modify_configs()
-
- self.assertEqual(1, modify_config_item_mock.call_count)
- self.assertEqual(self.catalog_cfg_type, modify_config_item_mock.call_args[0][0])
-
- def test_rename_all_properties(self):
- in_data_properties = {
- "test property": "test value",
- "rename property": "test value 2"
- }
- in_data_mapping = {
- "rename property": "test property 2"
- }
- expect_properties = {
- "test property": "test value",
- "test property 2": "test value 2"
- }
-
- # execute testing function
- actual_properties = upgradeHelper.rename_all_properties(in_data_properties, in_data_mapping)
-
- self.assertEqual(expect_properties, actual_properties)
-
- @patch.object(upgradeHelper, "update_config")
- def test_update_config_using_existing_properties(self, update_config_mock):
- actual_property = {
- "actual property": "actual value"
- }
- actual_attrib = {
+ @patch.object(upgradeHelper, "get_config_resp_all")
+ @patch("os.mkdir")
+ @patch("os.path.exists")
+ @patch("__builtin__.open")
+ def test_backup_configs(self, open_mock, os_path_exists_mock, mkdir_mock, get_config_resp_all_mock):
+ data = {
self.catalog_cfg_type: {
- "attribute 1": "attribute value 1"
- }
- }
- catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
- catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-
- # execute testing function
- upgradeHelper.update_config_using_existing_properties(self.catalog_cfg_type,
- catalog.get_properties(self.catalog_cfg_type),
- actual_property,
- actual_attrib,
- catalog
- )
- expected_dict = {}
- expected_dict.update(actual_property)
- expected_dict.update(catalog.get_properties_as_dict(catalog.get_properties(self.catalog_cfg_type)))
-
- expected_args = (
- (expected_dict, self.catalog_cfg_type),
- {
- "attributes": actual_attrib
- }
- )
-
- self.assertEqual(1, update_config_mock.call_count)
- self.assertEqual(expected_args, tuple(update_config_mock.call_args))
-
- @patch.object(upgradeHelper, "backup_single_config_type")
- @patch.object(upgradeHelper, "curl")
- def test_backup_configs(self, curl_mock, backup_single_config_type_mock):
- curl_mock.return_value = {
- 'Clusters': {
- 'desired_configs': {
- self.catalog_cfg_type: {
- "tag": "my tag"
- }
- }
+ "properties": {
+ "test-property": "value"
+ },
+ "tag": "version1"
}
}
-
- expected_args = (self.catalog_cfg_type, True)
+ os_path_exists_mock.return_value = False
+ get_config_resp_all_mock.return_value = data
+ expected = json.dumps(data[self.catalog_cfg_type]["properties"], indent=4)
+ stream = StringIO()
+ m = MagicMock()
+ m.__enter__.return_value = stream
+ open_mock.return_value = m
# execute testing function
upgradeHelper.backup_configs(self.catalog_cfg_type)
- self.assertEqual(1, backup_single_config_type_mock.call_count)
- self.assertEqual(expected_args, backup_single_config_type_mock.call_args[0])
-
- @patch.object(upgradeHelper, "get_config_resp")
- @patch.object(upgradeHelper, "write_config")
- def test_backup_single_config_type(self, write_config_mock, get_config_resp_mock):
- resp = {
- "property": "my data"
- }
- tag = "my tag"
- get_config_resp_mock.return_value = tag, resp
- expected_args = (resp, self.catalog_cfg_type, tag)
-
- # execute testing function
- upgradeHelper.backup_single_config_type(self.catalog_cfg_type, False)
-
- self.assertEqual(1, get_config_resp_mock.call_count)
- self.assertEqual(1, write_config_mock.call_count)
- self.assertEqual(expected_args, write_config_mock.call_args[0])
+ self.assertEqual(expected, stream.getvalue())
@patch.object(upgradeHelper, "curl")
def test_install_services(self, curl_mock):
@@ -827,7 +567,6 @@ class TestUpgradeHelper(TestCase):
(
('http://127.0.0.1:8080/api/v1/clusters/test1/services/MAPREDUCE2',),
{
- 'validate_expect_body': True,
'request_type': 'PUT',
'data': {
'RequestInfo': {
@@ -845,7 +584,6 @@ class TestUpgradeHelper(TestCase):
(
('http://127.0.0.1:8080/api/v1/clusters/test1/services/YARN',),
{
- 'validate_expect_body': True,
'request_type': 'PUT',
'data': {
'RequestInfo': {
@@ -869,53 +607,6 @@ class TestUpgradeHelper(TestCase):
for i in range(0, 1):
self.assertEqual(expected_args[i], tuple(curl_mock.call_args_list[i]))
- def test_validate_response(self):
- resp_in_data = [
- ["", False],
- ["", True],
- ["\"href\" : \"", True]
- ]
- resp_expected_results = [
- (0, ""),
- (1, ""),
- (0, "")
- ]
-
- # execute testing function
- for i in range(0, len(resp_in_data)):
- actual_code, actual_data = upgradeHelper.validate_response(resp_in_data[i][0], resp_in_data[i][1])
- self.assertEqual(resp_expected_results[i], (actual_code, actual_data))
-
- def test_configuration_item_diff(self):
- factory_mock = UpgradeCatalogFactoryMock(self.test_catalog)
- catalog = factory_mock.get_catalog(self.catalog_from, self.catalog_to)
- actual_properties = {
- self.catalog_cfg_type: {
- "my property": {
- "value": "my value"
- }
- }
- }
-
- expected_result = [
- {
- 'catalog_item': {
- 'value': u'my value',
- 'required-services': [u'TEST']
- },
- 'property': 'my property',
- 'actual_value': {
- 'value': 'my value'
- },
- 'catalog_value': u'my value'
- }
- ]
-
- # execute testing function
- actual_result = upgradeHelper.configuration_item_diff(self.catalog_cfg_type, catalog, actual_properties)
-
- self.assertEqual(expected_result, actual_result)
-
def test_configuration_diff_analyze(self):
in_data = {
self.catalog_cfg_type: [
@@ -978,7 +669,13 @@ class TestUpgradeHelper(TestCase):
options.upgrade_json = ""
upgradeHelper.Options.OPTIONS = options
+ upgradeHelper.Options.SERVICES = [self.required_service]
upgradecatalogfactory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
+ get_config_resp_all_mock.return_value = {
+ self.catalog_cfg_type: {
+ "properties": {}
+ }
+ }
# execute testing function
upgradeHelper.verify_configuration()
@@ -1022,7 +719,7 @@ class TestUpgradeHelper(TestCase):
}
}
- expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\""
+ expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\"\n"
# execute testing function
upgradeHelper.report_formatter(file, cfg_item, analyzed_list)