You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2015/06/30 15:28:46 UTC

[1/2] ambari git commit: AMBARI-12216. upgradeHelper.py script should handle property mapping across the property catalogs (dlysnichenko)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 ac6e99ee5 -> 1e1581010
  refs/heads/trunk df9c51073 -> 52520d0fb


AMBARI-12216. upgradeHelper.py script should handle property mapping across the property catalogs (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/52520d0f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/52520d0f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/52520d0f

Branch: refs/heads/trunk
Commit: 52520d0fb4a7bd2246928682b8468fddcb8f0737
Parents: df9c510
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Tue Jun 30 16:27:41 2015 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Jun 30 16:27:41 2015 +0300

----------------------------------------------------------------------
 ambari-server/src/main/python/upgradeHelper.py  | 1038 ++++++++++--------
 .../src/test/python/TestUpgradeHelper.py        |  383 +------
 2 files changed, 636 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/52520d0f/ambari-server/src/main/python/upgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/upgradeHelper.py b/ambari-server/src/main/python/upgradeHelper.py
index 4ab834a..196dd3c 100644
--- a/ambari-server/src/main/python/upgradeHelper.py
+++ b/ambari-server/src/main/python/upgradeHelper.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-
 """
 Upgrade catalog file format description:
 
@@ -70,8 +69,15 @@ Example:
           }
         }
       },
-     "property-mapping": {
-       "old-property-name": "new-property-name"
+      "property-mapping": {
+        "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" })
+        "old-property1-name": {
+          "map-to": "new_property1_name", (required, new property name)
+          "from-catalog": "test",        (optional, require "to-catalog. Source of old-property1-name)
+          "to-catalog": "test",          (optional, require "from-catalog. Target of new_property1_name)
+          "default": "default value",    (optional, if set and old property not exists, new one would be created with default value)
+          "required-services": ["YARN"]  (optional, process entry if services in the list existed on the cluster
+      }
      }
     }
   ]
@@ -85,13 +91,17 @@ import optparse
 from pprint import pprint
 import re
 import sys
-import datetime
 import os.path
 import logging
-import shutil
-import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
-import subprocess
 import time
+import base64
+from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError
+
+try:
+  # try to import new simplejson version, which should be faster than outdated python 2.6 version
+  import ambari_simplejson as json
+except ImportError:
+  import json
 
 
 # ==============================
@@ -116,6 +126,7 @@ class ReadOnlyPropertyException(Exception):
   def _get_message(self):
     return self.__str__()
 
+
 class NotSupportedCatalogVersion(Exception):
   def __init__(self, catalog_version):
     self._version = catalog_version
@@ -129,6 +140,22 @@ class NotSupportedCatalogVersion(Exception):
   message = property(__str__)
 
 
+class CatalogNotFoundException(Exception):
+  pass
+
+
+class CatalogExistException(Exception):
+  pass
+
+
+class PropertyNotFoundException(Exception):
+  pass
+
+
+class MalformedPropertyDefinitionException(Exception):
+  pass
+
+
 # ==============================
 #    Constant class definition
 # ==============================
@@ -137,27 +164,6 @@ class Const(object):
     raise Exception("Class couldn't be created")
 
 
-class CatConst(Const):
-  VERSION_TAG = "version"
-  STACK_VERSION_OLD = "old-version"
-  STACK_VERSION_TARGET = "target-version"
-  STACK_STAGS_TAG = "stacks"
-  STACK_NAME = "name"
-  CONFIG_OPTIONS = "options"
-  CONFIG_TYPES = "config-types"
-  STACK_PROPERTIES = "properties"
-  PROPERTY_VALUE_TAG = "value"
-  PROPERTY_REMOVE_TAG = "remove"
-  MERGED_COPY_TAG = "merged-copy"
-  REQUIRED_SERVICES = "required-services"
-  ITEMS_TAG = "items"
-  TYPE_TAG = "type"
-  TRUE_TAG = "yes"
-  STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
-  VALUE_TEMPLATE_TAG = "template"
-  SEARCH_PATTERN = "(\{[^\{\}]+\})"  # {XXXXX}
-
-
 class Options(Const):
   # action commands
   API_PROTOCOL = "http"
@@ -189,8 +195,6 @@ class Options(Const):
   COMPONENTS_FORMAT = None
 
   # Curl options
-  POST_REQUESTS = ['PUT', 'POST']
-  GET_REQUESTS = ['GET', 'DELETE']
   CURL_PRINT_ONLY = None
 
   ARGS = None
@@ -237,12 +241,41 @@ class Options(Const):
     cls.logger.addHandler(handler)
 
 
+class CatConst(Const):
+  VERSION_TAG = "version"
+  STACK_VERSION_OLD = "old-version"
+  STACK_VERSION_TARGET = "target-version"
+  STACK_STAGS_TAG = "stacks"
+  STACK_NAME = "name"
+  CONFIG_OPTIONS = "options"
+  CONFIG_TYPES = "config-types"
+  STACK_PROPERTIES = "properties"
+  STACK_PROPERTIES_ATTRIBUTES = "properties_attributes"
+  PROPERTY_VALUE_TAG = "value"
+  PROPERTY_REMOVE_TAG = "remove"
+  PROPERTY_MAP_TO = "map-to"
+  PROPERTY_FROM_CATALOG = "from-catalog"
+  PROPERTY_TO_CATALOG = "to-catalog"
+  PROPERTY_DEFAULT = "default"
+  MERGED_COPY_TAG = "merged-copy"
+  REQUIRED_SERVICES = "required-services"
+  ITEMS_TAG = "items"
+  TYPE_TAG = "type"
+  TRUE_TAG = "yes"
+  STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
+  VALUE_TEMPLATE_TAG = "template"
+  SEARCH_PATTERN = "(\{[^\{\}]+\})"  # {XXXXX}
+  ACTION_COMMIT = "commit"
+  ACTION_RELOAD = "reload"
+  ACTION_RENAME_PROPERTY = "rename-property"
+  TEMPLATE_HANDLER = "template_handler"
+
+
 # ==============================
 #    Catalog classes definition
 # ==============================
 class UpgradeCatalogFactory(object):
-
-   # versions of catalog which is currently supported
+  # versions of catalog which is currently supported
   _supported_catalog_versions = ["1.0"]
 
   # private variables
@@ -297,67 +330,53 @@ class UpgradeCatalogFactory(object):
 
 
 class UpgradeCatalog(object):
-
   # private variables
   _json_catalog = None
   _properties_catalog = None
-  _properties_map_catalog = {}  # Initially should be assigned empty dictionary as default value
+  _properties_map_catalog = None
   _version = None
   _search_pattern = None
+  _catalog_options = None
 
-  """
-   Substitute handler, should return replaced value, as param would be passed value and tokens to substitute
-   Please, be aware! Token should be unique in context of one catalog
-
-   Example:
-    def _substitute(tokens, value):
-      for token in tokens:
-        if token == "{REPLACE_ME}":
-          value = value.replace(token, "\"hello world\"")
-      return value
-
-    catalog.set_substitution_handler = _substitute
-
-    After that, all properties with CatConst.VALUE_TEMPLATE_TAG  set to "yes" would be processed
-  """
-  _substitution_handler = None
-
-  # public variable
-  config_groups = None
-
-  def __init__(self, catalog=None, version=None, substitution_handler=None):
+  def __init__(self, catalog=None, version=None):
+    self._handlers = {}
     self._json_catalog = catalog
     self._version = version
     self._search_pattern = re.compile(CatConst.SEARCH_PATTERN)
 
     if CatConst.STACK_PROPERTIES in catalog:
-      self._properties_catalog = catalog[CatConst.STACK_PROPERTIES]
+      self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES])
 
     if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog:
-      self._properties_map_catalog = catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]
+      self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG])
+    else:
+      self._properties_map_catalog = PropertyMapping()
 
     if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \
-                           and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
-
-      self.config_groups = ConfigConst(catalog[CatConst.CONFIG_OPTIONS][CatConst.CONFIG_TYPES],
-                                       properties_catalog=self._properties_catalog)
+            and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
+      self._catalog_options = catalog[CatConst.CONFIG_OPTIONS]
 
-    if substitution_handler is not None:
-      self.set_substitution_handler(substitution_handler)
+  def add_handler(self, name, handler):
+    if name not in self._handlers:
+      self._handlers[name] = handler
 
-  # deprecated, used for compatibility with old code
-  def get_properties_as_dict(self, properties):
-    target_dict = {}
-    for key in properties:
-      if CatConst.PROPERTY_VALUE_TAG in properties[key] and CatConst.PROPERTY_REMOVE_TAG not in properties[key]:
-        target_dict[key] = properties[key][CatConst.PROPERTY_VALUE_TAG]
-
-    return target_dict
-
-  def set_substitution_handler(self, handler):
-    self._substitution_handler = handler
+  def _format_catalog_properties(self, properties):
+    """
+    Transform properties from short form to normal one:
+    "property": "text" => "property": { "value": "text" }
+    :param properties: dict
+    :return: dict
+    """
+    for config_item in properties:
+      cfg_item = properties[config_item]
+      properties[config_item] = dict(zip(
+        cfg_item.keys(),
+        map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values())
+      ))
+    return properties
 
-  def _get_version(self):
+  @property
+  def version(self):
     return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET])
 
   def get_parsed_version(self):
@@ -385,84 +404,323 @@ class UpgradeCatalog(object):
 
     return version
 
-  def _get_name(self):
+  @property
+  def name(self):
     if CatConst.STACK_NAME in self._json_catalog:
       return self._json_catalog[CatConst.STACK_NAME]
     return ""
 
-  def _get_propoerty_mapping(self):
+  @property
+  def mapping(self):
     return self._properties_map_catalog
 
-  def get_properties(self, config_group):
-    if config_group in self._properties_catalog:
-      return self._filter_properties(config_group)
-    return None
+  @property
+  def items(self):
+    return self._properties_catalog
 
-  def _filter_properties(self, config_group):
-    def _property_filter_strings(value):
-      if not isinstance(value, dict):
-        return {CatConst.PROPERTY_VALUE_TAG: value}
-      else:
-        if self._substitution_handler is not None and CatConst.VALUE_TEMPLATE_TAG in value  \
-          and CatConst.VALUE_TEMPLATE_TAG in value:  # value contains template
-
-          parsed_value = self._substitution_handler(
-            self._search_pattern.findall(value[CatConst.PROPERTY_VALUE_TAG]), value[CatConst.PROPERTY_VALUE_TAG]
-          )
-          if parsed_value is not None:  # Check if target function returns result
-            value[CatConst.PROPERTY_VALUE_TAG] = parsed_value
-
-      return value
-    properties = self._properties_catalog[config_group].copy()  # pass to process only copy of data
-    properties = dict(zip(properties, map(_property_filter_strings, properties.values())))
+  @property
+  def options(self):
+    if CatConst.CONFIG_TYPES in self._catalog_options:
+      return self._catalog_options[CatConst.CONFIG_TYPES]
+    return {}
+
+  def __handle_remove_tag(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \
+                    catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \
+                    catalog_item_name in properties:
+      del properties[catalog_item_name]
+
+  def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    """
+    if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \
+                    CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[
+      CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+      parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER](
+        self,
+        self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]),
+        catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+      )
+      catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value
+
+  def __handle_add_new(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    catalog_property_item = dict(catalog_property_item)
+    if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties:
+      self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+      properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+
+  def __handle_change_existing(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    catalog_property_item = dict(catalog_property_item)
+    if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties:
+      self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+      properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
     return properties
 
-  version = property(_get_version)
-  name = property(_get_name)
-  property_map_catalog = property(_get_propoerty_mapping)
+  def __can_handler_execute(self, catalog_options, property_item):
+    """
+    :type catalog_options dict
+    :type property_item dict
+    """
+    can_process = True
+
+    # process required services tag
+    required_list = None
+
+    if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \
+            isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list):
+      required_list = catalog_options[CatConst.REQUIRED_SERVICES]
+
+    if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\
+            isinstance(property_item[CatConst.REQUIRED_SERVICES], list):
+      required_list = property_item[CatConst.REQUIRED_SERVICES]
 
+    if required_list is not None:
+      can_process = can_process and is_services_exists(required_list)
 
-class ConfigConst(object):
-  _config_types_const_definition = {}
-  _config_types_value_definition = {}
+    return can_process
 
-  def __init__(self, config_types_definition, properties_catalog=None):
-    if properties_catalog is not None:  # compensate possibly undefined config groups in options from property definition
-      for item in properties_catalog:
-        if item not in config_types_definition:
-          config_types_definition[item] = {}
+  def process_simple_transformations(self, name, properties):
+    """
+    :type properties dict
+    :type name str
+    """
+    tag_handlers = [
+      self.__handle_add_new,
+      self.__handle_change_existing,
+      self.__handle_remove_tag
+    ]
+    # catalog has no update entries for this config group
+    if name not in self._properties_catalog:
+      return 0
+
+    catalog_item = self._properties_catalog[name]
+    for catalog_property_item in catalog_item.keys():
+      catalog_options = self.options[name] if name in self.options else {}
+      if self.__can_handler_execute(catalog_options, catalog_item[catalog_property_item]):
+        for handler in tag_handlers:
+          handler(catalog_property_item, catalog_item[catalog_property_item], properties)
+
+
+class PropertyMapping(object):
+  _mapping_list = {}
+
+  def __init__(self, map_list=None):
+    if map_list is not None:
+      self._mapping_list = self._convert_list(map_list)
+
+  def _convert_list(self, map_list):
+    return dict(zip(
+      map_list.keys(),
+      map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values())
+    ))
+
+  def get(self, old_property_name):
+    """
+    Get property mapping dict
+    :old_property_name str
+    :return dict
+    """
+    if old_property_name in self._mapping_list:
+      return self._mapping_list[old_property_name]
 
-    self._config_types_value_definition = config_types_definition
-    for key in config_types_definition:
-      self._config_types_const_definition[key.replace("-", "_").lower()] = key
+    raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name)
 
   def list(self):
-    return self._config_types_value_definition.keys()
+    return self._mapping_list.keys()
+
+  def get_mapped_name(self, old_property_name):
+    if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name):
+      raise MalformedPropertyDefinitionException("%s option is not set for %s property" %
+                                                 (CatConst.PROPERTY_MAP_TO, old_property_name))
+    return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO]
+
+  def exists(self, old_property_name):
+    return old_property_name in self._mapping_list
 
-  def get(self, name):
+
+class ServerConfigFactory(object):
+  _server_catalogs = {}
+
+  def __init__(self):
+    self.__observers = []
+    self._load_configs()
+
+  def subscribe(self, name, config_item):
+    self.__observers.append((name, config_item))
+
+  def _load_configs(self):
+    Options.logger.info('Getting latest cluster configuration from the server...')
+    new_configs = get_config_resp_all()
+    for config_item in new_configs:
+      if config_item in self._server_catalogs:
+        self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item])
+      else:
+        self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item])
+
+  def notify_observers(self, action, arg=None):
+    for name, config_item in self.__observers:
+      if config_item is not None and name in self._server_catalogs:
+        config_item.notify(action, arg)
+
+  def notify_observer(self, _name, action, arg=None):
+    for name, config_item in self.__observers:
+      if config_item is not None and name == _name and name in self._server_catalogs:
+        config_item.notify(action, arg)
+
+  def get_config(self, name):
     """
-    Return desired property catalog
-    :param name: str
-    :return: dict
+    Get configuration item object
+    :type name str
+    :rtype: ServerConfig
     """
-    if name in self._config_types_value_definition:
-      return self._config_types_value_definition[name]
-    raise Exception("No config group with name %s found" % name)
+    if name in self._server_catalogs:
+      return self._server_catalogs[name]
+
+    raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name)
+
+  def create_config(self, name):
+    if name not in self._server_catalogs:
+      self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}})
+    else:
+      raise CatalogExistException("Config group \"%s\" already existed" % name)
+
+  def items(self):
+    return self._server_catalogs.keys()
 
-  def __getattr__(self, item):
+  def reload(self):
+    self._load_configs()
+
+  def process_mapping_transformations(self, catalog):
+    """
+    :type catalog UpgradeCatalog
+    """
+    for map_item in catalog.mapping.list():
+      self._process_single_map_transformation(map_item, catalog.mapping.get(map_item))
+
+  def _process_single_map_transformation(self, map_item_name, map_property_item):
+    """
+    :type map_item_name str
+    :type map_property_item dict
+    """
+    new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO]
+    source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\
+                                                                            map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None
+    target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \
+                                                                          map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None
+    default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \
+                                                                    map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None
+    required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None
+
+    # process required-services tag
+    if required_services is not None and not is_services_exists(required_services):
+      return 0
+
+    if source_cfg_group is None and target_cfg_group is None:  # global scope mapping renaming
+      self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [map_item_name, new_property_name])
+    elif source_cfg_group is not None and target_cfg_group is not None:  # group-to-group moving
+      if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs:
+        old_cfg_group = self.get_config(source_cfg_group).properties
+        new_cfg_group = self.get_config(target_cfg_group).properties
+
+        if map_item_name in old_cfg_group:
+          new_cfg_group[new_property_name] = old_cfg_group[map_item_name]
+          del old_cfg_group[map_item_name]
+        elif map_item_name not in old_cfg_group and default_value is not None:
+          new_cfg_group[new_property_name] = default_value
+
+  def commit(self):
+    self.notify_observers(CatConst.ACTION_COMMIT)
+
+
+class ServerConfig(object):
+  def __init__(self, factory, name, initial_configs):
     """
-    Support for constant handling like "<name>_tag" which would return real config name.
-    Base list loaded from section options\config-types of json.
+     Initialize configuration item
+     :factory ServerConfigFactory
+    """
+    factory.subscribe(name, self)
+    self._configs = initial_configs
+    self._hash = self._calculate_hash()
+    self._name = name
+
+  def _calculate_hash(self):
+    return hash(str(self._configs))
+
+  def notify(self, action, arg=None):
+    if action == CatConst.ACTION_RELOAD:
+      self._configs = arg
+      self._hash = self._calculate_hash()
+    elif action == CatConst.ACTION_COMMIT:
+      self._commit()
+    elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 2:
+      self._rename_property(arg[0], arg[1])
+
+  def _rename_property(self, old_name, new_name):
+    if old_name in self.properties:
+      old_property_value = self.properties[old_name]
+      self.properties[new_name] = old_property_value
+      del self.properties[old_name]
+
+  def is_attributes_exists(self):
+    return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs
+
+  @property
+  def properties(self):
+    return self._configs[CatConst.STACK_PROPERTIES]
+
+  @properties.setter
+  def properties(self, value):
+    self._configs[CatConst.STACK_PROPERTIES] = value
+
+  @property
+  def attributes(self):
+    return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+  @attributes.setter
+  def attributes(self, value):
+    self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value
+
+  def _commit(self):
+    if self._hash != self._calculate_hash():
+      Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name)
+      if self.is_attributes_exists():
+        update_config(self.properties, self._name, self.attributes)
+      else:
+        update_config(self.properties, self._name)
 
-    Example:
-      self.hbase_env_tag will return hbase-env
+  def clear(self):
+    self.properties = {}
+    self.attributes = {}
 
-    :param item: accessed attribute
-    :return: attribute value if exists or None
+  def merge(self, catalog_item):
+    """
+    :type catalog_item UpgradeCatalog
     """
-    item = item.lower()
-    if "_tag" in item and item[:-4] in self._config_types_const_definition:
-      return self._config_types_const_definition[item[:-4]]
+    # handle "merged-copy" tag
+    config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {}
+    clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and
+                            config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG)
+    if clear_properties:
+      self.clear()
+    Options.logger.info("Processing configuration group: %s", self._name)
+    catalog_item.process_simple_transformations(self._name, self.properties)
 
 
 def write_mapping(hostmapping):
@@ -471,13 +729,6 @@ def write_mapping(hostmapping):
   json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w'))
 
 
-def write_config(config, cfg_type, tag):
-  file_name = cfg_type + "_" + tag
-  if os.path.isfile(file_name):
-    os.remove(file_name)
-  json.dump(config, open(file_name, 'w'))
-
-
 def read_mapping():
   if os.path.isfile(Options.MR_MAPPING_FILE):
     if Options.MR_MAPPING is not None:
@@ -497,7 +748,7 @@ def get_mr1_mapping():
   hostmapping = {}
   for component in components:
     hostlist = []
-    structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True, validate_expect_body=True)
+    structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True)
 
     if 'host_components' in structured_resp:
       for hostcomponent in structured_resp['host_components']:
@@ -552,15 +803,15 @@ def delete_mr():
     if (key in NON_CLIENTS) and (len(value) > 0):
       for host in value:
         curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED,
-             validate=True, validate_expect_body=False)
+             validate=True)
 
-  curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True, validate_expect_body=False)
+  curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True)
 
 
 def get_cluster_stackname():
   VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version'
 
-  structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, validate_expect_body=True, parse=True)
+  structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, parse=True)
 
   if 'Clusters' in structured_resp:
     if 'version' in structured_resp['Clusters']:
@@ -574,8 +825,8 @@ def has_component_in_stack_def(stack_name, service_name, component_name):
   stack, stack_version = stack_name.split('-')
 
   try:
-    curl(STACK_COMPONENT_URL_FORMAT.format(stack,stack_version, service_name, component_name),
-          validate=True, validate_expect_body=True, simulate=False)
+    curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name),
+         validate=True, simulate=False)
     return True
   except FatalException:
     return False
@@ -606,15 +857,15 @@ def add_services():
   hostmapping = read_mapping()
 
   for service in service_comp.keys():
-    curl(SERVICE_URL_FORMAT.format(service), validate=True, validate_expect_body=False, request_type="POST")
+    curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST")
 
     for component in service_comp[service]:
       curl(COMPONENT_URL_FORMAT.format(service, component),
-           validate=True, validate_expect_body=False, request_type="POST")
+           validate=True, request_type="POST")
 
       for host in hostmapping[new_old_host_map[component]]:
         curl(HOST_COMPONENT_URL_FORMAT.format(host, component),
-             validate=True, validate_expect_body=False, request_type="POST")
+             validate=True, request_type="POST")
 
 
 def update_config(properties, config_type, attributes=None):
@@ -625,83 +876,14 @@ def update_config(properties, config_type, attributes=None):
 
   expect_body = config_type != "cluster-env"  # ToDo: make exceptions more flexible
 
-  curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True,
-       validate_expect_body=expect_body, soft_validation=True)
-
-
-def get_zookeeper_quorum():
-  zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
-  zoo_quorum = []
-  zoo_def_port = "2181"
-  if "host_components" in zoo_cfg:
-    for item in zoo_cfg["host_components"]:
-      zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
-
-  return ",".join(zoo_quorum)
-
-
-def get_config(cfg_type):
-  tag, structured_resp = get_config_resp(cfg_type)
-  properties = None
-  properties_attributes = None
-
-  if 'items' in structured_resp:
-    for item in structured_resp['items']:
-      if (tag == item['tag']) or (cfg_type == item['type']):
-        if 'properties' in item:
-          properties = item['properties']
-        if 'properties_attributes' in item:
-          properties_attributes = item['properties_attributes']
-        break
-  if properties is None:
-    raise FatalException(-1, "Unable to read configuration for type " + cfg_type + " and tag " + tag)
-
-  return properties, properties_attributes
-
-
-def parse_config_resp(resp):
-  parsed_configs = []
-  if CatConst.ITEMS_TAG in resp:
-    for config_item in resp[CatConst.ITEMS_TAG]:
-      parsed_configs.append({
-        "type": config_item[CatConst.TYPE_TAG],
-        "properties": config_item[CatConst.STACK_PROPERTIES]
-      })
-  return parsed_configs
-
-
-def get_config_resp(cfg_type, error_if_na=True, parsed=False, tag=None):
-  CONFIG_URL_FORMAT = Options.CLUSTER_URL + '/configurations?type={0}&tag={1}'
-
-  # Read the config version
-  if tag is None:
-    structured_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
-
-    if 'Clusters' in structured_resp:
-      if 'desired_configs' in structured_resp['Clusters']:
-        if cfg_type in structured_resp['Clusters']['desired_configs']:
-          tag = structured_resp['Clusters']['desired_configs'][cfg_type]['tag']
-
-  if tag is not None:
-    # Get the config with the tag and return properties
-    structured_resp = curl(CONFIG_URL_FORMAT.format(cfg_type, tag), parse=True, simulate=False,
-                           validate=True, validate_expect_body=True)
-    if parsed:
-      return tag, parse_config_resp(structured_resp)
-    else:
-      return tag, structured_resp
-  else:
-    if error_if_na:
-      raise FatalException(-1, "Unable to get the current version for config type " + cfg_type)
-    else:
-      return tag, None
+  curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True)
 
 
 def get_config_resp_all():
   desired_configs = {}
-  CONFIG_ALL_PROPERTIES_URL = Options.CLUSTER_URL  + "/configurations?fields=properties"
-  desired_configs_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
-  all_options = curl(CONFIG_ALL_PROPERTIES_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+  config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties,properties_attributes"
+  desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True, simulate=False)
+  all_options = curl(config_all_properties_url, validate=True, parse=True, simulate=False)
 
   if 'Clusters' in desired_configs_resp:
     if 'desired_configs' in desired_configs_resp['Clusters']:
@@ -712,20 +894,35 @@ def get_config_resp_all():
     return None
 
   if CatConst.ITEMS_TAG in all_options:
-    all_options = all_options["items"]
+    all_options = all_options[CatConst.ITEMS_TAG]
   else:
     return None
 
   all_options = filter(
-    lambda x: x["type"] in desired_configs_resp and x["tag"] == desired_configs_resp[x["type"]]["tag"],
+    lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][
+      "tag"],
     all_options)
 
   for item in all_options:
-    if CatConst.STACK_PROPERTIES in item:  # config item could not contain anu property
-      desired_configs[item["type"]] = item["properties"]
+    dc_item = {}
+
+    if CatConst.STACK_PROPERTIES in item:  # config item could not contain any property
+      dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES]
+    else:
+      dc_item[CatConst.STACK_PROPERTIES] = {}
+
+    if CatConst.STACK_PROPERTIES_ATTRIBUTES in item:
+      dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+    if "tag" in item:
+      dc_item["tag"] = item["tag"]
+
+    if dc_item != {}:
+      desired_configs[item[CatConst.TYPE_TAG]] = dc_item
 
   return desired_configs
 
+
 def is_services_exists(required_services):
   """
   return true, if required_services is a part of Options.SERVICES
@@ -738,6 +935,7 @@ def is_services_exists(required_services):
 
   return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES
 
+
 def get_cluster_services():
   services_url = Options.CLUSTER_URL + '/services'
   raw_services = curl(services_url, parse=True, simulate=False)
@@ -750,92 +948,73 @@ def get_cluster_services():
   Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work")
   return []
 
-def filter_properties_by_service_presence(config_type, catalog, catalog_properties):
+
+def get_zookeeper_quorum():
+  zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
+  zoo_quorum = []
+  zoo_def_port = "2181"
+  if "host_components" in zoo_cfg:
+    for item in zoo_cfg["host_components"]:
+      zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
+
+  return ",".join(zoo_quorum)
+
+
+def get_jt_host(catalog):
   """
-  Filter properties by required-services tag.
-  required-services tag could be catalog to per-property defined. per-property definition
-  will always override per-catalog definition.
-  :param config_type: str
-  :param catalog: UpgradeCatalog
-  :param catalog_properties: dict
-  :return: dict
+  :type catalog: UpgradeCatalog
+  :rtype str
   """
-  cproperties = dict(catalog_properties)
-  catalog_required_services = []
-  del_props = []
-
-  #  do nothing
-  if CatConst.REQUIRED_SERVICES in catalog.config_groups.get(config_type) and \
-    isinstance(catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES], list):
-    catalog_required_services = catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES]
-
-  for prop_name in cproperties:
-    # set per catalog limitation
-    required_services = catalog_required_services
-    if CatConst.REQUIRED_SERVICES in cproperties[prop_name] and\
-      isinstance(cproperties[prop_name][CatConst.REQUIRED_SERVICES], list):
-      # set per property limitation
-      required_services = catalog_properties[prop_name][CatConst.REQUIRED_SERVICES]
-
-    # add property to list for remove
-    if not is_services_exists(required_services):
-      del_props.append(prop_name)
-
-  # remove properties
-  for prop in del_props:
-    del cproperties[prop]
-
-  return cproperties
-
-def modify_config_item(config_type, catalog):
-  #  here should be declared tokens for pattern replace
-
-  if catalog.get_parsed_version()["from"] == 13:  # ToDo: introduce class for pre-defined tokens
-    hostmapping = read_mapping()
-    jt_host = hostmapping["JOBTRACKER"][0]
-    jh_host = hostmapping["HISTORYSERVER"][0]
-  else:
-    jt_host = ""
-    jh_host = ""
-
-  def _substitute(tokens, value):
-    for token in tokens:
-      if token == "{JOBHISTORY_HOST}":
-        value = value.replace(token, jh_host)
-      elif token == "{RESOURCEMANAGER_HOST}":
-        value = value.replace(token, jt_host)
-      elif token == "{ZOOKEEPER_QUORUM}":
-        value = value.replace(token, get_zookeeper_quorum())
-    return value
-  # Exit from function if was passed not suitable parameters
-  catalog.set_substitution_handler(_substitute)
+  if catalog.get_parsed_version()["from"] == 13:
+    return read_mapping()["JOBTRACKER"][0]
 
-  try:
-    properties_latest, properties_attributes_latest = get_config(config_type)
-    properties_latest = rename_all_properties(properties_latest, catalog.property_map_catalog)
-  except Exception as e:
-    properties_latest = {}
-    properties_attributes_latest = None
-
-  properties_copy = catalog.get_properties(config_type)
-  is_merged_copy = CatConst.MERGED_COPY_TAG in catalog.config_groups.get(config_type) \
-   and catalog.config_groups.get(config_type)[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG
-
-  # filter properties by service-required tag
-  properties_copy = filter_properties_by_service_presence(config_type, catalog, properties_copy)
-
-  # ToDo: implement property transfer from one catalog to other
-  #   properties_to_move = [
-  #     "dfs.namenode.checkpoint.edits.dir",
-  #     "dfs.namenode.checkpoint.dir",
-  #     "dfs.namenode.checkpoint.period"]
-  Options.logger.info("Updating '%s' catalog item..." % config_type)
-  if is_merged_copy:  # Append configs to existed ones
-    tag, structured_resp = get_config_resp(config_type, False)
-    if structured_resp is not None:
-      update_config_using_existing_properties(config_type, properties_copy, properties_latest, properties_attributes_latest, catalog)
-  else:  # Rewrite/create config items
-    update_config(catalog.get_properties_as_dict(properties_copy), config_type)
+  return ""
+
+
+def get_jh_host(catalog):
+  """
+  :type catalog: UpgradeCatalog
+  :rtype str
+  """
+  if catalog.get_parsed_version()["from"] == 13:
+    return read_mapping()["HISTORYSERVER"][0]
+
+  return ""
+
+
+def _substitute_handler(upgrade_catalog, tokens, value):
+  """
+  Substitute handler
+  :param upgrade_catalog: UpgradeCatalog
+  :param tokens: list
+  :param value: str
+  :rtype str
+  """
+  for token in tokens:
+    if token == "{JOBHISTORY_HOST}":
+      value = value.replace(token, get_jh_host(upgrade_catalog))
+    elif token == "{RESOURCEMANAGER_HOST}":
+      value = value.replace(token, get_jt_host(upgrade_catalog))
+    elif token == "{ZOOKEEPER_QUORUM}":
+      value = value.replace(token, get_zookeeper_quorum())
+  return value
+
+
+def modify_config_item(config_type, catalog, server_config_factory):
+  """
+  Modify configuration item
+  :type config_type str
+  :type catalog UpgradeCatalog
+  :type server_config_factory ServerConfigFactory
+  """
+
+  # if config group is absent on the server, we will create it
+  if config_type not in server_config_factory.items():
+    server_config_factory.create_config(config_type)
+
+  server_config_catalog = server_config_factory.get_config(config_type)
+
+  server_config_catalog.merge(catalog)
 
 
 def modify_configs():
@@ -845,77 +1024,59 @@ def modify_configs():
     config_type = None
 
   catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json)  # Load upgrade catalog
-  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)  # get desired version of catalog
+  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+                                     Options.OPTIONS.to_stack)  # get desired version of catalog
+
+  # load all desired configs from the server
+  server_config_factory = ServerConfigFactory()
 
   if catalog is None:
     raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified"
                          % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
 
-  if config_type is not None and config_type not in catalog.config_groups.list():
+  # add user-defined template processing function
+  catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler)
+
+  if config_type is not None and config_type not in catalog.items:
     raise FatalException("Config type %s not exists, no configs was modified" % config_type)
 
   if config_type is not None:
-    modify_config_item(config_type, catalog)
+    modify_config_item(config_type, catalog, server_config_factory)
   else:
-    for collection_name in catalog.config_groups.list():
-      modify_config_item(collection_name, catalog)
-
-
-def rename_all_properties(properties, name_mapping):
-  for key, val in name_mapping.items():
-    if (key in properties.keys()) and (val not in properties.keys()):
-      properties[val] = properties[key]
-      del properties[key]
-  return properties
-
-
-# properties template - passed as dict from UpgradeCatalog
-def update_config_using_existing_properties(conf_type, properties_template,
-                                            site_properties, properties_attributes_latest, catalog):
-  keys_processed = []
-  keys_to_delete = []
-  properties_parsed = catalog.get_properties_as_dict(properties_template)
-
-  for key in properties_template.keys():
-    keys_processed.append(key)
-    if CatConst.PROPERTY_REMOVE_TAG in properties_template and properties_template[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
-      keys_to_delete.append(key)
-
-  for key in site_properties.keys():
-    if key not in keys_processed:
-      properties_parsed[key] = site_properties[key]
-
-  for key in keys_to_delete:
-    del properties_parsed[key]
-
-  # check property attributes list
-  if properties_attributes_latest is not None:
-    for key in properties_attributes_latest:
-      properties_attributes_latest[key] = dict(filter(
-        lambda (item_key, item_value): item_key not in keys_to_delete,
-        zip(properties_attributes_latest[key].keys(), properties_attributes_latest[key].values())
-      ))
+    for collection_name in catalog.items:
+      modify_config_item(collection_name, catalog, server_config_factory)
+
+  server_config_factory.process_mapping_transformations(catalog)
 
-  update_config(properties_parsed, conf_type, attributes=properties_attributes_latest)
+  # commit changes to server, if any will be found
+  server_config_factory.commit()
 
 
 def backup_configs(conf_type=None):
-  DESIRED_CONFIGS_URL = Options.CLUSTER_URL + "?fields=Clusters/desired_configs"
+  dir = "backups_%d" % time.time()
+  file_pattern = "%s%s%s_%s.json"
+  configs = get_config_resp_all()
+  if configs is None:
+    Options.logger.error("Unexpected response from the server")
+    return -1
 
-  desired_configs = curl(DESIRED_CONFIGS_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+  if conf_type is not None and conf_type in configs:
+    configs = {conf_type: configs[conf_type]}
 
-  if "Clusters" in desired_configs and "desired_configs" in desired_configs["Clusters"]:
-    for conf_type in desired_configs["Clusters"]["desired_configs"].keys():
-      backup_single_config_type(conf_type, True)
+  if not os.path.exists(dir):
+    os.mkdir(dir)
 
+  for item in configs:
+    filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"])
+    if os.path.exists(filename):
+      os.remove(filename)
 
-def backup_single_config_type(conf_type, error_if_na=True):
-  tag, response = get_config_resp(conf_type, error_if_na)
-  if response is not None:
-    Options.logger.info("Saving config for type: " + conf_type + " and tag: " + tag)
-    write_config(response, conf_type, tag)
-  else:
-    Options.logger.info("Unable to obtain config for type: " + conf_type)
+    try:
+      with open(filename, "w") as f:
+        f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4))
+      Options.logger.info("Catalog \"%s\" stored to %s", item, filename)
+    except IOError as e:
+      Options.logger.error("Unable to store \"%s\": %s", item, e)
 
 
 def install_services():
@@ -948,94 +1109,91 @@ def install_services():
   err_message = ""
   for index in [0, 1]:
     try:
-      curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True,
-           validate_expect_body=not Options.OPTIONS.printonly, request_type="PUT", data=PUT_IN_INSTALLED[index])
+      curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index])
     except FatalException as e:
       if not e.code == 0:
         err_retcode = e.code
         err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "."
 
   if err_retcode != 0:
-    raise FatalException(err_retcode, err_message + "(Services may already be installed or agents are not yet started.)")
+    raise FatalException(err_retcode,
+                         err_message + "(Services may already be installed or agents are not yet started.)")
 
   Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \
-                         "the status of the install requests."
+                                 "the status of the install requests."
 
 
-def validate_response(response, expect_body):
-  if expect_body:
-    if "\"href\" : \"" not in response:
-      return 1, response
-    else:
-      return 0, ""
-  elif len(response) > 0:
-    return 1, response
-  else:
-    return 0, ""
+def generate_auth_header(user, password):
+  token = "%s:%s" % (user, password)
+  token = base64.encodestring(token)
+  return {"Authorization": "Basic %s" % token.replace('\n', '')}
 
 
 def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False,
-         simulate=None, validate=False, validate_expect_body=False, soft_validation=False):
+         simulate=None, validate=False, soft_validation=False):
+  _headers = {}
+  handler_chain = []
+  post_req = ["POST", "PUT"]
+  get_req = ["GET", "DELETE"]
 
   simulate_only = Options.CURL_PRINT_ONLY is not None or (simulate is not None and simulate is True)
   print_url = Options.CURL_PRINT_ONLY is not None and simulate is not None
+  if request_type not in post_req + get_req:
+    raise IOError("Wrong request type \"%s\" passed" % request_type)
 
-  curl_path = '/usr/bin/curl'
-  curl_list = [curl_path]
-
-  curl_list.append('-X')
-  curl_list.append(request_type)
+  if data is not None and isinstance(data, dict):
+    data = json.dumps(data)
 
   if tokens is not None:
-    curl_list.append('-u')
-    curl_list.append("%s:%s" % (tokens["user"], tokens["pass"]))
+    _headers.update(generate_auth_header(tokens["user"], tokens["pass"]))
   elif Options.API_TOKENS is not None:
-    curl_list.append('-u')
-    curl_list.append("%s:%s" % (Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
-
-  if request_type in Options.POST_REQUESTS:
-    curl_list.append(url)
+    _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
 
-  if headers is None and Options.HEADERS is not None:
-    headers = Options.HEADERS
+  if request_type in post_req and data is not None:
+    _headers["Content-Length"] = len(data)
 
   if headers is not None:
-    for header in headers:
-      curl_list.append('-H')
-      curl_list.append("%s: %s" % (header, headers[header]))
+    _headers.update(headers)
 
-  if data is not None and request_type in Options.POST_REQUESTS:
-    curl_list.append('--data')
-    curl_list.append(json.dumps(data))
+  if Options.HEADERS is not None:
+    _headers.update(Options.HEADERS)
 
-  if request_type in Options.GET_REQUESTS:
-    curl_list.append(url)
+  director = build_opener(*handler_chain)
+  if request_type in post_req:
+    _data = bytes(data)
+    req = Request(url, headers=_headers, data=_data)
+  else:
+    req = Request(url, headers=_headers)
+
+  req.get_method = lambda: request_type
 
   if print_url:
-    Options.logger.info(" ".join(curl_list))
+    Options.logger.info(url)
 
+  code = 200
   if not simulate_only:
-    osStat = subprocess.Popen(
-      curl_list,
-      stderr=subprocess.PIPE,
-      stdout=subprocess.PIPE)
-    out, err = osStat.communicate()
-    if 0 != osStat.returncode:
-      error = "curl call failed. out: " + out + " err: " + err
-      Options.logger.error(error)
-      raise FatalException(osStat.returncode, error)
+    try:
+      resp = director.open(req)
+      out = resp.read()
+      if isinstance(out, bytes):
+        out = out.decode("utf-8")
+      code = resp.code
+    except URLError as e:
+      Options.logger.error(str(e))
+      if isinstance(e, HTTPError):
+        raise e
+      else:
+        raise FatalException(-1, str(e))
   else:
     if not print_url:
-      Options.logger.info(" ".join(curl_list))
+      Options.logger.info(url)
     out = "{}"
 
-  if validate and not simulate_only:
-    retcode, errdata = validate_response(out, validate_expect_body)
-    if not retcode == 0:
-      if soft_validation:
-        Options.logger.warning("Response validation failed, please check previous action result manually.")
-      else:
-        raise FatalException(retcode, errdata)
+  if validate and not simulate_only and (code > 299 or code < 200):
+    if soft_validation:
+      Options.logger.warning("Response validation failed, please check previous action result manually.")
+    else:
+      raise FatalException(code, "Response validation failed, please check previous action result manually.")
 
   if parse:
     return json.loads(out)
@@ -1055,15 +1213,13 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
    }
   :param collection_name:
   :param catalog:
+  :param actual_properties_list
   :return:
   """
 
   verified_catalog = []
-  catalog_properties = catalog.get_properties(collection_name)
-  actual_properties = None
-
-  if collection_name in actual_properties_list:
-    actual_properties = actual_properties_list[collection_name]
+  catalog_properties = dict(catalog)
+  actual_properties = dict(actual_properties_list)
 
   if actual_properties is None:
     verified_catalog = map(lambda x: {
@@ -1085,7 +1241,8 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
     verified_catalog_catalog = map(lambda x: {
       "property": x,
       "catalog_item": catalog_properties[x],
-      "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in catalog_properties[x] else None,
+      "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in
+                                                                             catalog_properties[x] else None,
       "actual_value": actual_properties[x] if x in actual_properties else None,
     }, catalog_properties.keys())
 
@@ -1114,18 +1271,18 @@ def configuration_diff_analyze(diff_list):
       # process properties which can be absent
 
       # item was removed, from actual configs according to catalog instructions
-      if property_item["actual_value"] is None and property_item["catalog_value"] is None \
-        and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
-        and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
+      if property_item["actual_value"] is None \
+              and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
+              and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
 
         push_status("ok", property_item)
 
-       # currently skip values with template tag, as there no filter implemented
-       # ToDo: implement possibility to filter values without filter handler,
-       # ToDo: currently filtering is possible only on update-configs stage
+        # currently skip values with template tag, as there no filter implemented
+        # ToDo: implement possibility to filter values without filter handler,
+        # ToDo: currently filtering is possible only on update-configs stage
       elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \
-        and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
-        and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+              and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
+              and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
 
         push_status("skipped", property_item)
 
@@ -1156,23 +1313,24 @@ def verify_configuration():
     config_type = None
 
   catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json)  # Load upgrade catalog
-  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)  # get desired version of catalog
+  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+                                     Options.OPTIONS.to_stack)  # get desired version of catalog
+  server_configs = ServerConfigFactory()
 
   if catalog is None:
     raise FatalException(1, "Upgrade catalog for version %s-%s not found"
                          % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
 
-  if config_type is not None and config_type not in catalog.config_groups.list():
+  if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items():
     raise FatalException("Config type %s not exists" % config_type)
 
   # fetch from server all option at one time and filter only desired versions
-  actual_options = get_config_resp_all()
 
   if config_type is not None:
-    diff_list[config_type] = configuration_item_diff(config_type, catalog, actual_options)
+    diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties)
   else:
-    for collection_name in catalog.config_groups.list():
-      diff_list[collection_name] = configuration_item_diff(collection_name, catalog, actual_options)
+    for collection_name in catalog.items.keys():
+      diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties)
 
   analyzed_list = configuration_diff_analyze(diff_list)
 
@@ -1187,7 +1345,7 @@ def verify_configuration():
     if analyzed_list[config_item]["fail"]["count"] != 0:
       Options.logger.info(
         "%s: %s missing configuration(s) - please look in the output file for the missing params" % (
-         config_item, analyzed_list[config_item]["fail"]["count"]
+          config_item, analyzed_list[config_item]["fail"]["count"]
         )
       )
       if report_file is not None:
@@ -1206,16 +1364,12 @@ def report_formatter(report_file, config_item, analyzed_list_item):
   prefix = "Configuration item %s" % config_item
   if analyzed_list_item["fail"]["count"] > 0:
     for item in analyzed_list_item["fail"]["items"]:
-      report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"" % (
+      report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % (
         prefix, item["property"], item["actual_value"], item["catalog_value"]
       ))
 
 
-#
-# Main.
-#
 def main():
-
   action_list = {  # list of supported actions
                    Options.GET_MR_MAPPING_ACTION: get_mr1_mapping,
                    Options.DELETE_MR_ACTION: delete_mr,
@@ -1224,7 +1378,7 @@ def main():
                    Options.INSTALL_YARN_MR2_ACTION: install_services,
                    Options.BACKUP_CONFIG_ACTION: backup_configs,
                    Options.VERIFY_ACTION: verify_configuration
-  }
+                   }
 
   parser = optparse.OptionParser(usage="usage: %prog [options] action\n  Valid actions: "
                                        + ", ".join(action_list.keys())

http://git-wip-us.apache.org/repos/asf/ambari/blob/52520d0f/ambari-server/src/test/python/TestUpgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestUpgradeHelper.py b/ambari-server/src/test/python/TestUpgradeHelper.py
index 1624dd2..73b1e4e 100644
--- a/ambari-server/src/test/python/TestUpgradeHelper.py
+++ b/ambari-server/src/test/python/TestUpgradeHelper.py
@@ -48,6 +48,7 @@ class TestUpgradeHelper(TestCase):
   catalog_to = "2.2"
   catalog_cfg_type = "my type"
   required_service = "TEST"
+  curl_response = "{}"
   test_catalog = """{
    "version": "1.0",
    "stacks": [
@@ -95,19 +96,12 @@ class TestUpgradeHelper(TestCase):
     sys.stdout = self.out
 
   def magic_curl(self, *args, **kwargs):
+    resp = self.curl_response
+    self.curl_response = "{}"
+    if "parse" in kwargs and isinstance(resp, str) and kwargs["parse"] == True:
+      resp = json.loads(resp)
+    return resp
 
-    def ret_object():
-      return ""
-
-    def communicate():
-      return "{}", ""
-
-    ret_object.returncode = 0
-    ret_object.communicate = communicate
-
-    with patch("upgradeHelper.subprocess") as subprocess:
-      subprocess.Popen.return_value = ret_object
-      self.original_curl(*args, **kwargs)
 
   def tearDown(self):
     sys.stdout = sys.__stdout__
@@ -155,19 +149,6 @@ class TestUpgradeHelper(TestCase):
     self.assertEqual(True, actual_result)
     self.assertEqual(True, actual_result_1)
 
-  @patch.object(upgradeHelper, "is_services_exists")
-  def test_filter_properties_by_service_presence(self, is_service_exists_mock):
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-    cfg_type = self.catalog_cfg_type
-    is_service_exists_mock.return_value = True
-
-    old_services = upgradeHelper.Options.SERVICES
-    upgradeHelper.Options.SERVICES = set([self.required_service])
-    actual_result = upgradeHelper.filter_properties_by_service_presence(cfg_type, catalog, catalog.get_properties(self.catalog_cfg_type))
-
-    upgradeHelper.Options.SERVICES = old_services
-    self.assertEqual(catalog.get_properties(self.catalog_cfg_type), actual_result)
 
   @patch("__builtin__.open")
   @patch.object(os.path, "isfile")
@@ -193,34 +174,6 @@ class TestUpgradeHelper(TestCase):
 
   @patch("__builtin__.open")
   @patch.object(os.path, "isfile")
-  @patch("os.remove")
-  def test_write_config(self, remove_mock, isfile_mock, open_mock):
-    test_data = {
-      "test_field": "test_value"
-    }
-    test_result = json.dumps(test_data)
-    test_cfgtype = 'cfg.me'
-    test_tag = 'tag.test'
-    test_filename = "%s_%s" % (test_cfgtype, test_tag)
-    test_result = json.dumps(test_data)
-    output = StringIO()
-    isfile_mock.return_value = True
-    open_mock.return_value = output
-
-    # execute testing function
-    upgradeHelper.write_config(test_data, test_cfgtype, test_tag)
-
-    self.assertEquals(1, isfile_mock.call_count)
-    self.assertEquals(1, remove_mock.call_count)
-    self.assertEquals(1, open_mock.call_count)
-
-    # check file name
-    self.assertEquals(test_filename, isfile_mock.call_args[0][0])
-    # check content
-    self.assertEquals(test_result, output.getvalue())
-
-  @patch("__builtin__.open")
-  @patch.object(os.path, "isfile")
   def test_read_mapping(self, isfile_mock, open_mock):
     test_data = {
       "test_field": "test_value"
@@ -353,8 +306,7 @@ class TestUpgradeHelper(TestCase):
               {
                 "request_type": "PUT",
                 "data": PUT_IN_DISABLED,
-                "validate": True,
-                "validate_expect_body": False
+                "validate": True
               }
             ]
           )
@@ -364,8 +316,7 @@ class TestUpgradeHelper(TestCase):
         (SERVICE_URL_FORMAT,),
         {
           "request_type": "DELETE",
-          "validate": True,
-          "validate_expect_body": False
+          "validate": True
         }
       ]
     )
@@ -439,7 +390,6 @@ class TestUpgradeHelper(TestCase):
         (SERVICE_URL_FORMAT.format(service),),
         {
           "validate": True,
-          "validate_expect_body": False,
           "request_type": "POST"
         }
       ])
@@ -448,7 +398,6 @@ class TestUpgradeHelper(TestCase):
           (COMPONENT_URL_FORMAT.format(service, component),),
           {
             "validate": True,
-            "validate_expect_body": False,
             "request_type": "POST"
           }
         ])
@@ -457,7 +406,6 @@ class TestUpgradeHelper(TestCase):
             (HOST_COMPONENT_URL_FORMAT.format(host, component),),
             {
               "validate": True,
-              "validate_expect_body": False,
               "request_type": "POST"
             }
           ])
@@ -488,8 +436,7 @@ class TestUpgradeHelper(TestCase):
         "request_type": "PUT",
         "data": copy.deepcopy(properties_payload),
         "validate": True,
-        "soft_validation": True,
-        "validate_expect_body": True
+        "soft_validation": True
       }
     )
 
@@ -500,8 +447,7 @@ class TestUpgradeHelper(TestCase):
         "request_type": "PUT",
         "data": copy.deepcopy(properties_payload),
         "validate": True,
-        "soft_validation": True,
-        "validate_expect_body": True
+        "soft_validation": True
       }
     )
 
@@ -547,86 +493,6 @@ class TestUpgradeHelper(TestCase):
 
     self.assertEqual(expected_result, actual_result)
 
-  @patch.object(upgradeHelper, "get_config_resp")
-  def test_get_config(self, get_config_resp_mock):
-    config_type = "test type"
-    tag_name = "my tag"
-    properties = {
-      "my property": "property value"
-    }
-    property_atributes = {
-      "myproperty": "property value"
-    }
-
-    in_data = tag_name, {
-      "items": [
-        {
-          "tag": tag_name,
-          "type": config_type,
-          "properties": properties,
-          "properties_attributes": property_atributes
-        }
-      ]
-    }
-
-    get_config_resp_mock.return_value = in_data
-    expected_data = (properties, property_atributes)
-
-    # execute testing function
-    actual_data = upgradeHelper.get_config(config_type)
-
-    self.assertEqual(expected_data, actual_data)
-
-  def test_parse_config_resp(self):
-    cfg_type = "type 1"
-    cfg_properties = {
-      "my property": "property value"
-    }
-
-    in_data = {
-      upgradeHelper.CatConst.ITEMS_TAG: [
-        {
-          upgradeHelper.CatConst.TYPE_TAG: cfg_type,
-          upgradeHelper.CatConst.STACK_PROPERTIES: cfg_properties
-        }
-      ]
-    }
-
-    expected_result = [{
-     "type": cfg_type,
-     "properties": cfg_properties
-    }]
-
-    # execute testing function
-    actual_result = upgradeHelper.parse_config_resp(in_data)
-
-    self.assertEqual(expected_result, actual_result)
-
-  @patch.object(upgradeHelper, "curl")
-  @patch.object(upgradeHelper, "parse_config_resp")
-  def test_get_config_resp(self, parse_config_resp_mock, curl_mock):
-    cfg_type = "my type"
-    cfg_tag = "my tag"
-    cfg_data = "test"
-    curl_responses = [
-      {
-        'Clusters': {
-          'desired_configs': {
-            cfg_type: {
-              "tag": cfg_tag
-            }
-          }
-        }
-      },
-      cfg_data
-    ]
-    curl_mock.side_effect = MagicMock(side_effect=curl_responses)
-
-    # execute testing function
-    actual_tag, actual_data = upgradeHelper.get_config_resp(cfg_type, False, False)
-
-    self.assertEqual((cfg_tag, cfg_data), (actual_tag, actual_data))
-
   @patch.object(upgradeHelper, "curl")
   def test_get_config_resp_all(self, curl_mock):
     cfg_type = "my type"
@@ -656,8 +522,11 @@ class TestUpgradeHelper(TestCase):
     ]
 
     expected_result = {
-      cfg_type: cfg_properties
-    }
+        cfg_type: {
+          "properties": cfg_properties,
+          "tag": cfg_tag
+        }
+      }
     curl_mock.side_effect = MagicMock(side_effect=curl_resp)
 
     # execute testing function
@@ -666,160 +535,31 @@ class TestUpgradeHelper(TestCase):
     self.assertEquals(expected_result, actual_result)
     pass
 
-  @patch.object(upgradeHelper, "read_mapping")
-  @patch.object(upgradeHelper, "get_config")
-  @patch.object(upgradeHelper, "update_config_using_existing_properties")
-  @patch.object(upgradeHelper, "update_config")
-  @patch.object(upgradeHelper, "get_config_resp")
-  def test_modify_config_item(self, get_config_resp_mock, upgrade_config_mock, update_config_using_existing_properties_mock,
-                              get_config_mock, read_mapping_mock):
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    get_config_resp_mock.return_value = "", {}
-    old_services = upgradeHelper.Options.SERVICES
-    upgradeHelper.Options.SERVICES = set([self.required_service])
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-    cfg_type = self.catalog_cfg_type
-    read_mapping_mock.return_value = {
-      "MAPREDUCE_CLIENT": ["test.host.vm"],
-      "JOBTRACKER": ["test1.host.vm"],
-      "TASKTRACKER": ["test2.host.vm"],
-      "HISTORYSERVER": ["test3.host.vm"]
-    }
-    get_config_mock.return_value = {"my replace property": "property value 2"}, {}
-    expected_params = [
-      cfg_type,
-      {
-        "my property": {
-          "value": "my value",
-          "required-services": ["TEST"]
-        }
-      },
-      {
-        "my property 2": "property value 2"
-      },
-    ]
-
-    # execute testing function
-    upgradeHelper.modify_config_item(cfg_type, catalog)
-
-    upgradeHelper.Options.SERVICES = old_services
-
-    actual_params = [
-      update_config_using_existing_properties_mock.call_args[0][0],
-      update_config_using_existing_properties_mock.call_args[0][1],
-      update_config_using_existing_properties_mock.call_args[0][2]
-    ]
-    self.assertEquals(update_config_using_existing_properties_mock.call_count, 1)
-    self.assertEqual(upgrade_config_mock.call_count, 0)
-
-    self.assertEqual(expected_params, actual_params)
-
-  @patch.object(upgradeHelper, "UpgradeCatalogFactory", autospec=True)
-  @patch.object(upgradeHelper, "modify_config_item")
-  def test_modify_configs(self, modify_config_item_mock, factory_mock):
-    factory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
-    options = lambda: ""
-    options.from_stack = self.catalog_from
-    options.to_stack = self.catalog_to
-    options.upgrade_json = ""
-
-    upgradeHelper.Options.OPTIONS = options
-
-    # execute testing function
-    upgradeHelper.modify_configs()
-
-    self.assertEqual(1, modify_config_item_mock.call_count)
-    self.assertEqual(self.catalog_cfg_type, modify_config_item_mock.call_args[0][0])
-
-  def test_rename_all_properties(self):
-    in_data_properties = {
-      "test property": "test value",
-      "rename property": "test value 2"
-    }
-    in_data_mapping = {
-      "rename property": "test property 2"
-    }
-    expect_properties = {
-      "test property": "test value",
-      "test property 2": "test value 2"
-    }
-
-    # execute testing function
-    actual_properties = upgradeHelper.rename_all_properties(in_data_properties, in_data_mapping)
-
-    self.assertEqual(expect_properties, actual_properties)
-
-  @patch.object(upgradeHelper, "update_config")
-  def test_update_config_using_existing_properties(self, update_config_mock):
-    actual_property = {
-      "actual property": "actual value"
-    }
-    actual_attrib = {
+  @patch.object(upgradeHelper, "get_config_resp_all")
+  @patch("os.mkdir")
+  @patch("os.path.exists")
+  @patch("__builtin__.open")
+  def test_backup_configs(self, open_mock, os_path_exists_mock, mkdir_mock, get_config_resp_all_mock):
+    data = {
       self.catalog_cfg_type: {
-        "attribute 1": "attribute value 1"
-      }
-    }
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-
-    # execute testing function
-    upgradeHelper.update_config_using_existing_properties(self.catalog_cfg_type,
-                                                          catalog.get_properties(self.catalog_cfg_type),
-                                                          actual_property,
-                                                          actual_attrib,
-                                                          catalog
-                                                          )
-    expected_dict = {}
-    expected_dict.update(actual_property)
-    expected_dict.update(catalog.get_properties_as_dict(catalog.get_properties(self.catalog_cfg_type)))
-
-    expected_args = (
-      (expected_dict, self.catalog_cfg_type),
-      {
-        "attributes": actual_attrib
-      }
-    )
-
-    self.assertEqual(1, update_config_mock.call_count)
-    self.assertEqual(expected_args, tuple(update_config_mock.call_args))
-
-  @patch.object(upgradeHelper, "backup_single_config_type")
-  @patch.object(upgradeHelper, "curl")
-  def test_backup_configs(self, curl_mock, backup_single_config_type_mock):
-    curl_mock.return_value = {
-      'Clusters': {
-        'desired_configs': {
-          self.catalog_cfg_type: {
-            "tag": "my tag"
-          }
-        }
+        "properties": {
+          "test-property": "value"
+        },
+        "tag": "version1"
       }
     }
-
-    expected_args = (self.catalog_cfg_type, True)
+    os_path_exists_mock.return_value = False
+    get_config_resp_all_mock.return_value = data
+    expected = json.dumps(data[self.catalog_cfg_type]["properties"], indent=4)
+    stream = StringIO()
+    m = MagicMock()
+    m.__enter__.return_value = stream
+    open_mock.return_value = m
 
     # execute testing function
     upgradeHelper.backup_configs(self.catalog_cfg_type)
 
-    self.assertEqual(1, backup_single_config_type_mock.call_count)
-    self.assertEqual(expected_args, backup_single_config_type_mock.call_args[0])
-
-  @patch.object(upgradeHelper, "get_config_resp")
-  @patch.object(upgradeHelper, "write_config")
-  def test_backup_single_config_type(self, write_config_mock, get_config_resp_mock):
-    resp = {
-      "property": "my data"
-    }
-    tag = "my tag"
-    get_config_resp_mock.return_value = tag, resp
-    expected_args = (resp, self.catalog_cfg_type, tag)
-
-    # execute testing function
-    upgradeHelper.backup_single_config_type(self.catalog_cfg_type, False)
-
-    self.assertEqual(1, get_config_resp_mock.call_count)
-    self.assertEqual(1, write_config_mock.call_count)
-    self.assertEqual(expected_args, write_config_mock.call_args[0])
+    self.assertEqual(expected, stream.getvalue())
 
   @patch.object(upgradeHelper, "curl")
   def test_install_services(self, curl_mock):
@@ -827,7 +567,6 @@ class TestUpgradeHelper(TestCase):
       (
         ('http://127.0.0.1:8080/api/v1/clusters/test1/services/MAPREDUCE2',),
         {
-          'validate_expect_body': True,
           'request_type': 'PUT',
           'data': {
             'RequestInfo': {
@@ -845,7 +584,6 @@ class TestUpgradeHelper(TestCase):
       (
         ('http://127.0.0.1:8080/api/v1/clusters/test1/services/YARN',),
         {
-          'validate_expect_body': True,
           'request_type': 'PUT',
           'data': {
             'RequestInfo': {
@@ -869,53 +607,6 @@ class TestUpgradeHelper(TestCase):
     for i in range(0, 1):
       self.assertEqual(expected_args[i], tuple(curl_mock.call_args_list[i]))
 
-  def test_validate_response(self):
-    resp_in_data = [
-      ["", False],
-      ["", True],
-      ["\"href\" : \"", True]
-    ]
-    resp_expected_results = [
-      (0, ""),
-      (1, ""),
-      (0, "")
-    ]
-
-    # execute testing function
-    for i in range(0, len(resp_in_data)):
-      actual_code, actual_data = upgradeHelper.validate_response(resp_in_data[i][0], resp_in_data[i][1])
-      self.assertEqual(resp_expected_results[i], (actual_code, actual_data))
-
-  def test_configuration_item_diff(self):
-    factory_mock = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = factory_mock.get_catalog(self.catalog_from, self.catalog_to)
-    actual_properties = {
-      self.catalog_cfg_type: {
-        "my property": {
-         "value": "my value"
-        }
-      }
-    }
-
-    expected_result = [
-      {
-        'catalog_item': {
-          'value': u'my value',
-          'required-services': [u'TEST']
-        },
-        'property': 'my property',
-        'actual_value': {
-          'value': 'my value'
-        },
-        'catalog_value': u'my value'
-      }
-    ]
-
-    # execute testing function
-    actual_result = upgradeHelper.configuration_item_diff(self.catalog_cfg_type, catalog, actual_properties)
-
-    self.assertEqual(expected_result, actual_result)
-
   def test_configuration_diff_analyze(self):
     in_data = {
         self.catalog_cfg_type: [
@@ -978,7 +669,13 @@ class TestUpgradeHelper(TestCase):
     options.upgrade_json = ""
 
     upgradeHelper.Options.OPTIONS = options
+    upgradeHelper.Options.SERVICES = [self.required_service]
     upgradecatalogfactory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
+    get_config_resp_all_mock.return_value = {
+      self.catalog_cfg_type: {
+        "properties": {}
+      }
+    }
 
     # execute testing function
     upgradeHelper.verify_configuration()
@@ -1022,7 +719,7 @@ class TestUpgradeHelper(TestCase):
         }
     }
 
-    expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\""
+    expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\"\n"
 
     # execute testing function
     upgradeHelper.report_formatter(file, cfg_item, analyzed_list)


[2/2] ambari git commit: AMBARI-12216. upgradeHelper.py script should handle property mapping across the property catalogs (dlysnichenko)

Posted by dm...@apache.org.
AMBARI-12216. upgradeHelper.py script should handle property mapping across the property catalogs (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1e158101
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1e158101
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1e158101

Branch: refs/heads/branch-2.1
Commit: 1e1581010c9bb2019ce3a6b66c74416f565c6a02
Parents: ac6e99e
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Tue Jun 30 16:27:41 2015 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Jun 30 16:28:23 2015 +0300

----------------------------------------------------------------------
 ambari-server/src/main/python/upgradeHelper.py  | 1038 ++++++++++--------
 .../src/test/python/TestUpgradeHelper.py        |  383 +------
 2 files changed, 636 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1e158101/ambari-server/src/main/python/upgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/upgradeHelper.py b/ambari-server/src/main/python/upgradeHelper.py
index 4ab834a..196dd3c 100644
--- a/ambari-server/src/main/python/upgradeHelper.py
+++ b/ambari-server/src/main/python/upgradeHelper.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-
 """
 Upgrade catalog file format description:
 
@@ -70,8 +69,15 @@ Example:
           }
         }
       },
-     "property-mapping": {
-       "old-property-name": "new-property-name"
+      "property-mapping": {
+        "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" })
+        "old-property1-name": {
+          "map-to": "new_property1_name", (required, new property name)
+          "from-catalog": "test",        (optional, require "to-catalog. Source of old-property1-name)
+          "to-catalog": "test",          (optional, require "from-catalog. Target of new_property1_name)
+          "default": "default value",    (optional, if set and old property not exists, new one would be created with default value)
+          "required-services": ["YARN"]  (optional, process entry if services in the list existed on the cluster
+      }
      }
     }
   ]
@@ -85,13 +91,17 @@ import optparse
 from pprint import pprint
 import re
 import sys
-import datetime
 import os.path
 import logging
-import shutil
-import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
-import subprocess
 import time
+import base64
+from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError
+
+try:
+  # try to import new simplejson version, which should be faster than outdated python 2.6 version
+  import ambari_simplejson as json
+except ImportError:
+  import json
 
 
 # ==============================
@@ -116,6 +126,7 @@ class ReadOnlyPropertyException(Exception):
   def _get_message(self):
     return self.__str__()
 
+
 class NotSupportedCatalogVersion(Exception):
   def __init__(self, catalog_version):
     self._version = catalog_version
@@ -129,6 +140,22 @@ class NotSupportedCatalogVersion(Exception):
   message = property(__str__)
 
 
+class CatalogNotFoundException(Exception):
+  pass
+
+
+class CatalogExistException(Exception):
+  pass
+
+
+class PropertyNotFoundException(Exception):
+  pass
+
+
+class MalformedPropertyDefinitionException(Exception):
+  pass
+
+
 # ==============================
 #    Constant class definition
 # ==============================
@@ -137,27 +164,6 @@ class Const(object):
     raise Exception("Class couldn't be created")
 
 
-class CatConst(Const):
-  VERSION_TAG = "version"
-  STACK_VERSION_OLD = "old-version"
-  STACK_VERSION_TARGET = "target-version"
-  STACK_STAGS_TAG = "stacks"
-  STACK_NAME = "name"
-  CONFIG_OPTIONS = "options"
-  CONFIG_TYPES = "config-types"
-  STACK_PROPERTIES = "properties"
-  PROPERTY_VALUE_TAG = "value"
-  PROPERTY_REMOVE_TAG = "remove"
-  MERGED_COPY_TAG = "merged-copy"
-  REQUIRED_SERVICES = "required-services"
-  ITEMS_TAG = "items"
-  TYPE_TAG = "type"
-  TRUE_TAG = "yes"
-  STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
-  VALUE_TEMPLATE_TAG = "template"
-  SEARCH_PATTERN = "(\{[^\{\}]+\})"  # {XXXXX}
-
-
 class Options(Const):
   # action commands
   API_PROTOCOL = "http"
@@ -189,8 +195,6 @@ class Options(Const):
   COMPONENTS_FORMAT = None
 
   # Curl options
-  POST_REQUESTS = ['PUT', 'POST']
-  GET_REQUESTS = ['GET', 'DELETE']
   CURL_PRINT_ONLY = None
 
   ARGS = None
@@ -237,12 +241,41 @@ class Options(Const):
     cls.logger.addHandler(handler)
 
 
+class CatConst(Const):
+  VERSION_TAG = "version"
+  STACK_VERSION_OLD = "old-version"
+  STACK_VERSION_TARGET = "target-version"
+  STACK_STAGS_TAG = "stacks"
+  STACK_NAME = "name"
+  CONFIG_OPTIONS = "options"
+  CONFIG_TYPES = "config-types"
+  STACK_PROPERTIES = "properties"
+  STACK_PROPERTIES_ATTRIBUTES = "properties_attributes"
+  PROPERTY_VALUE_TAG = "value"
+  PROPERTY_REMOVE_TAG = "remove"
+  PROPERTY_MAP_TO = "map-to"
+  PROPERTY_FROM_CATALOG = "from-catalog"
+  PROPERTY_TO_CATALOG = "to-catalog"
+  PROPERTY_DEFAULT = "default"
+  MERGED_COPY_TAG = "merged-copy"
+  REQUIRED_SERVICES = "required-services"
+  ITEMS_TAG = "items"
+  TYPE_TAG = "type"
+  TRUE_TAG = "yes"
+  STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping"
+  VALUE_TEMPLATE_TAG = "template"
+  SEARCH_PATTERN = "(\{[^\{\}]+\})"  # {XXXXX}
+  ACTION_COMMIT = "commit"
+  ACTION_RELOAD = "reload"
+  ACTION_RENAME_PROPERTY = "rename-property"
+  TEMPLATE_HANDLER = "template_handler"
+
+
 # ==============================
 #    Catalog classes definition
 # ==============================
 class UpgradeCatalogFactory(object):
-
-   # versions of catalog which is currently supported
+  # versions of catalog which is currently supported
   _supported_catalog_versions = ["1.0"]
 
   # private variables
@@ -297,67 +330,53 @@ class UpgradeCatalogFactory(object):
 
 
 class UpgradeCatalog(object):
-
   # private variables
   _json_catalog = None
   _properties_catalog = None
-  _properties_map_catalog = {}  # Initially should be assigned empty dictionary as default value
+  _properties_map_catalog = None
   _version = None
   _search_pattern = None
+  _catalog_options = None
 
-  """
-   Substitute handler, should return replaced value, as param would be passed value and tokens to substitute
-   Please, be aware! Token should be unique in context of one catalog
-
-   Example:
-    def _substitute(tokens, value):
-      for token in tokens:
-        if token == "{REPLACE_ME}":
-          value = value.replace(token, "\"hello world\"")
-      return value
-
-    catalog.set_substitution_handler = _substitute
-
-    After that, all properties with CatConst.VALUE_TEMPLATE_TAG  set to "yes" would be processed
-  """
-  _substitution_handler = None
-
-  # public variable
-  config_groups = None
-
-  def __init__(self, catalog=None, version=None, substitution_handler=None):
+  def __init__(self, catalog=None, version=None):
+    self._handlers = {}
     self._json_catalog = catalog
     self._version = version
     self._search_pattern = re.compile(CatConst.SEARCH_PATTERN)
 
     if CatConst.STACK_PROPERTIES in catalog:
-      self._properties_catalog = catalog[CatConst.STACK_PROPERTIES]
+      self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES])
 
     if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog:
-      self._properties_map_catalog = catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]
+      self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG])
+    else:
+      self._properties_map_catalog = PropertyMapping()
 
     if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \
-                           and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
-
-      self.config_groups = ConfigConst(catalog[CatConst.CONFIG_OPTIONS][CatConst.CONFIG_TYPES],
-                                       properties_catalog=self._properties_catalog)
+            and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]:
+      self._catalog_options = catalog[CatConst.CONFIG_OPTIONS]
 
-    if substitution_handler is not None:
-      self.set_substitution_handler(substitution_handler)
+  def add_handler(self, name, handler):
+    if name not in self._handlers:
+      self._handlers[name] = handler
 
-  # deprecated, used for compatibility with old code
-  def get_properties_as_dict(self, properties):
-    target_dict = {}
-    for key in properties:
-      if CatConst.PROPERTY_VALUE_TAG in properties[key] and CatConst.PROPERTY_REMOVE_TAG not in properties[key]:
-        target_dict[key] = properties[key][CatConst.PROPERTY_VALUE_TAG]
-
-    return target_dict
-
-  def set_substitution_handler(self, handler):
-    self._substitution_handler = handler
+  def _format_catalog_properties(self, properties):
+    """
+    Transform properties from short form to normal one:
+    "property": "text" => "property": { "value": "text" }
+    :param properties: dict
+    :return: dict
+    """
+    for config_item in properties:
+      cfg_item = properties[config_item]
+      properties[config_item] = dict(zip(
+        cfg_item.keys(),
+        map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values())
+      ))
+    return properties
 
-  def _get_version(self):
+  @property
+  def version(self):
     return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET])
 
   def get_parsed_version(self):
@@ -385,84 +404,323 @@ class UpgradeCatalog(object):
 
     return version
 
-  def _get_name(self):
+  @property
+  def name(self):
     if CatConst.STACK_NAME in self._json_catalog:
       return self._json_catalog[CatConst.STACK_NAME]
     return ""
 
-  def _get_propoerty_mapping(self):
+  @property
+  def mapping(self):
     return self._properties_map_catalog
 
-  def get_properties(self, config_group):
-    if config_group in self._properties_catalog:
-      return self._filter_properties(config_group)
-    return None
+  @property
+  def items(self):
+    return self._properties_catalog
 
-  def _filter_properties(self, config_group):
-    def _property_filter_strings(value):
-      if not isinstance(value, dict):
-        return {CatConst.PROPERTY_VALUE_TAG: value}
-      else:
-        if self._substitution_handler is not None and CatConst.VALUE_TEMPLATE_TAG in value  \
-          and CatConst.VALUE_TEMPLATE_TAG in value:  # value contains template
-
-          parsed_value = self._substitution_handler(
-            self._search_pattern.findall(value[CatConst.PROPERTY_VALUE_TAG]), value[CatConst.PROPERTY_VALUE_TAG]
-          )
-          if parsed_value is not None:  # Check if target function returns result
-            value[CatConst.PROPERTY_VALUE_TAG] = parsed_value
-
-      return value
-    properties = self._properties_catalog[config_group].copy()  # pass to process only copy of data
-    properties = dict(zip(properties, map(_property_filter_strings, properties.values())))
+  @property
+  def options(self):
+    if CatConst.CONFIG_TYPES in self._catalog_options:
+      return self._catalog_options[CatConst.CONFIG_TYPES]
+    return {}
+
+  def __handle_remove_tag(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \
+                    catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \
+                    catalog_item_name in properties:
+      del properties[catalog_item_name]
+
+  def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    """
+    if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \
+                    CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[
+      CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+      parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER](
+        self,
+        self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]),
+        catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+      )
+      catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value
+
+  def __handle_add_new(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    catalog_property_item = dict(catalog_property_item)
+    if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties:
+      self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+      properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
+
+  def __handle_change_existing(self, catalog_item_name, catalog_property_item, properties):
+    """
+    :type catalog_item_name str
+    :type catalog_property_item dict
+    :type properties dict
+    """
+    catalog_property_item = dict(catalog_property_item)
+    if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties:
+      self.__handle_template_tag_sub(catalog_item_name, catalog_property_item)
+      properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG]
     return properties
 
-  version = property(_get_version)
-  name = property(_get_name)
-  property_map_catalog = property(_get_propoerty_mapping)
+  def __can_handler_execute(self, catalog_options, property_item):
+    """
+    :type catalog_options dict
+    :type property_item dict
+    """
+    can_process = True
+
+    # process required services tag
+    required_list = None
+
+    if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \
+            isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list):
+      required_list = catalog_options[CatConst.REQUIRED_SERVICES]
+
+    if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\
+            isinstance(property_item[CatConst.REQUIRED_SERVICES], list):
+      required_list = property_item[CatConst.REQUIRED_SERVICES]
 
+    if required_list is not None:
+      can_process = can_process and is_services_exists(required_list)
 
-class ConfigConst(object):
-  _config_types_const_definition = {}
-  _config_types_value_definition = {}
+    return can_process
 
-  def __init__(self, config_types_definition, properties_catalog=None):
-    if properties_catalog is not None:  # compensate possibly undefined config groups in options from property definition
-      for item in properties_catalog:
-        if item not in config_types_definition:
-          config_types_definition[item] = {}
+  def process_simple_transformations(self, name, properties):
+    """
+    :type properties dict
+    :type name str
+    """
+    tag_handlers = [
+      self.__handle_add_new,
+      self.__handle_change_existing,
+      self.__handle_remove_tag
+    ]
+    # catalog has no update entries for this config group
+    if name not in self._properties_catalog:
+      return 0
+
+    catalog_item = self._properties_catalog[name]
+    for catalog_property_item in catalog_item.keys():
+      catalog_options = self.options[name] if name in self.options else {}
+      if self.__can_handler_execute(catalog_options, catalog_item[catalog_property_item]):
+        for handler in tag_handlers:
+          handler(catalog_property_item, catalog_item[catalog_property_item], properties)
+
+
+class PropertyMapping(object):
+  _mapping_list = {}
+
+  def __init__(self, map_list=None):
+    if map_list is not None:
+      self._mapping_list = self._convert_list(map_list)
+
+  def _convert_list(self, map_list):
+    return dict(zip(
+      map_list.keys(),
+      map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values())
+    ))
+
+  def get(self, old_property_name):
+    """
+    Get property mapping dict
+    :old_property_name str
+    :return dict
+    """
+    if old_property_name in self._mapping_list:
+      return self._mapping_list[old_property_name]
 
-    self._config_types_value_definition = config_types_definition
-    for key in config_types_definition:
-      self._config_types_const_definition[key.replace("-", "_").lower()] = key
+    raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name)
 
   def list(self):
-    return self._config_types_value_definition.keys()
+    return self._mapping_list.keys()
+
+  def get_mapped_name(self, old_property_name):
+    if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name):
+      raise MalformedPropertyDefinitionException("%s option is not set for %s property" %
+                                                 (CatConst.PROPERTY_MAP_TO, old_property_name))
+    return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO]
+
+  def exists(self, old_property_name):
+    return old_property_name in self._mapping_list
 
-  def get(self, name):
+
+class ServerConfigFactory(object):
+  _server_catalogs = {}
+
+  def __init__(self):
+    self.__observers = []
+    self._load_configs()
+
+  def subscribe(self, name, config_item):
+    self.__observers.append((name, config_item))
+
+  def _load_configs(self):
+    Options.logger.info('Getting latest cluster configuration from the server...')
+    new_configs = get_config_resp_all()
+    for config_item in new_configs:
+      if config_item in self._server_catalogs:
+        self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item])
+      else:
+        self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item])
+
+  def notify_observers(self, action, arg=None):
+    for name, config_item in self.__observers:
+      if config_item is not None and name in self._server_catalogs:
+        config_item.notify(action, arg)
+
+  def notify_observer(self, _name, action, arg=None):
+    for name, config_item in self.__observers:
+      if config_item is not None and name == _name and name in self._server_catalogs:
+        config_item.notify(action, arg)
+
+  def get_config(self, name):
     """
-    Return desired property catalog
-    :param name: str
-    :return: dict
+    Get configuration item object
+    :type name str
+    :rtype: ServerConfig
     """
-    if name in self._config_types_value_definition:
-      return self._config_types_value_definition[name]
-    raise Exception("No config group with name %s found" % name)
+    if name in self._server_catalogs:
+      return self._server_catalogs[name]
+
+    raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name)
+
+  def create_config(self, name):
+    if name not in self._server_catalogs:
+      self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}})
+    else:
+      raise CatalogExistException("Config group \"%s\" already existed" % name)
+
+  def items(self):
+    return self._server_catalogs.keys()
 
-  def __getattr__(self, item):
+  def reload(self):
+    self._load_configs()
+
+  def process_mapping_transformations(self, catalog):
+    """
+    :type catalog UpgradeCatalog
+    """
+    for map_item in catalog.mapping.list():
+      self._process_single_map_transformation(map_item, catalog.mapping.get(map_item))
+
+  def _process_single_map_transformation(self, map_item_name, map_property_item):
+    """
+    :type map_item_name str
+    :type map_property_item dict
+    """
+    new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO]
+    source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\
+                                                                            map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None
+    target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \
+                                                                          map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None
+    default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \
+                                                                    map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None
+    required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None
+
+    # process required-services tag
+    if required_services is not None and not is_services_exists(required_services):
+      return 0
+
+    if source_cfg_group is None and target_cfg_group is None:  # global scope mapping renaming
+      self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [map_item_name, new_property_name])
+    elif source_cfg_group is not None and target_cfg_group is not None:  # group-to-group moving
+      if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs:
+        old_cfg_group = self.get_config(source_cfg_group).properties
+        new_cfg_group = self.get_config(target_cfg_group).properties
+
+        if map_item_name in old_cfg_group:
+          new_cfg_group[new_property_name] = old_cfg_group[map_item_name]
+          del old_cfg_group[map_item_name]
+        elif map_item_name not in old_cfg_group and default_value is not None:
+          new_cfg_group[new_property_name] = default_value
+
+  def commit(self):
+    self.notify_observers(CatConst.ACTION_COMMIT)
+
+
+class ServerConfig(object):
+  def __init__(self, factory, name, initial_configs):
     """
-    Support for constant handling like "<name>_tag" which would return real config name.
-    Base list loaded from section options\config-types of json.
+     Initialize configuration item
+     :factory ServerConfigFactory
+    """
+    factory.subscribe(name, self)
+    self._configs = initial_configs
+    self._hash = self._calculate_hash()
+    self._name = name
+
+  def _calculate_hash(self):
+    return hash(str(self._configs))
+
+  def notify(self, action, arg=None):
+    if action == CatConst.ACTION_RELOAD:
+      self._configs = arg
+      self._hash = self._calculate_hash()
+    elif action == CatConst.ACTION_COMMIT:
+      self._commit()
+    elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 2:
+      self._rename_property(arg[0], arg[1])
+
+  def _rename_property(self, old_name, new_name):
+    if old_name in self.properties:
+      old_property_value = self.properties[old_name]
+      self.properties[new_name] = old_property_value
+      del self.properties[old_name]
+
+  def is_attributes_exists(self):
+    return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs
+
+  @property
+  def properties(self):
+    return self._configs[CatConst.STACK_PROPERTIES]
+
+  @properties.setter
+  def properties(self, value):
+    self._configs[CatConst.STACK_PROPERTIES] = value
+
+  @property
+  def attributes(self):
+    return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+  @attributes.setter
+  def attributes(self, value):
+    self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value
+
+  def _commit(self):
+    if self._hash != self._calculate_hash():
+      Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name)
+      if self.is_attributes_exists():
+        update_config(self.properties, self._name, self.attributes)
+      else:
+        update_config(self.properties, self._name)
 
-    Example:
-      self.hbase_env_tag will return hbase-env
+  def clear(self):
+    self.properties = {}
+    self.attributes = {}
 
-    :param item: accessed attribute
-    :return: attribute value if exists or None
+  def merge(self, catalog_item):
+    """
+    :type catalog_item UpgradeCatalog
     """
-    item = item.lower()
-    if "_tag" in item and item[:-4] in self._config_types_const_definition:
-      return self._config_types_const_definition[item[:-4]]
+    # handle "merged-copy" tag
+    config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {}
+    clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and
+                            config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG)
+    if clear_properties:
+      self.clear()
+    Options.logger.info("Processing configuration group: %s", self._name)
+    catalog_item.process_simple_transformations(self._name, self.properties)
 
 
 def write_mapping(hostmapping):
@@ -471,13 +729,6 @@ def write_mapping(hostmapping):
   json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w'))
 
 
-def write_config(config, cfg_type, tag):
-  file_name = cfg_type + "_" + tag
-  if os.path.isfile(file_name):
-    os.remove(file_name)
-  json.dump(config, open(file_name, 'w'))
-
-
 def read_mapping():
   if os.path.isfile(Options.MR_MAPPING_FILE):
     if Options.MR_MAPPING is not None:
@@ -497,7 +748,7 @@ def get_mr1_mapping():
   hostmapping = {}
   for component in components:
     hostlist = []
-    structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True, validate_expect_body=True)
+    structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True)
 
     if 'host_components' in structured_resp:
       for hostcomponent in structured_resp['host_components']:
@@ -552,15 +803,15 @@ def delete_mr():
     if (key in NON_CLIENTS) and (len(value) > 0):
       for host in value:
         curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED,
-             validate=True, validate_expect_body=False)
+             validate=True)
 
-  curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True, validate_expect_body=False)
+  curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True)
 
 
 def get_cluster_stackname():
   VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version'
 
-  structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, validate_expect_body=True, parse=True)
+  structured_resp = curl(VERSION_URL_FORMAT, simulate=False, validate=True, parse=True)
 
   if 'Clusters' in structured_resp:
     if 'version' in structured_resp['Clusters']:
@@ -574,8 +825,8 @@ def has_component_in_stack_def(stack_name, service_name, component_name):
   stack, stack_version = stack_name.split('-')
 
   try:
-    curl(STACK_COMPONENT_URL_FORMAT.format(stack,stack_version, service_name, component_name),
-          validate=True, validate_expect_body=True, simulate=False)
+    curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name),
+         validate=True, simulate=False)
     return True
   except FatalException:
     return False
@@ -606,15 +857,15 @@ def add_services():
   hostmapping = read_mapping()
 
   for service in service_comp.keys():
-    curl(SERVICE_URL_FORMAT.format(service), validate=True, validate_expect_body=False, request_type="POST")
+    curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST")
 
     for component in service_comp[service]:
       curl(COMPONENT_URL_FORMAT.format(service, component),
-           validate=True, validate_expect_body=False, request_type="POST")
+           validate=True, request_type="POST")
 
       for host in hostmapping[new_old_host_map[component]]:
         curl(HOST_COMPONENT_URL_FORMAT.format(host, component),
-             validate=True, validate_expect_body=False, request_type="POST")
+             validate=True, request_type="POST")
 
 
 def update_config(properties, config_type, attributes=None):
@@ -625,83 +876,14 @@ def update_config(properties, config_type, attributes=None):
 
   expect_body = config_type != "cluster-env"  # ToDo: make exceptions more flexible
 
-  curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True,
-       validate_expect_body=expect_body, soft_validation=True)
-
-
-def get_zookeeper_quorum():
-  zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
-  zoo_quorum = []
-  zoo_def_port = "2181"
-  if "host_components" in zoo_cfg:
-    for item in zoo_cfg["host_components"]:
-      zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
-
-  return ",".join(zoo_quorum)
-
-
-def get_config(cfg_type):
-  tag, structured_resp = get_config_resp(cfg_type)
-  properties = None
-  properties_attributes = None
-
-  if 'items' in structured_resp:
-    for item in structured_resp['items']:
-      if (tag == item['tag']) or (cfg_type == item['type']):
-        if 'properties' in item:
-          properties = item['properties']
-        if 'properties_attributes' in item:
-          properties_attributes = item['properties_attributes']
-        break
-  if properties is None:
-    raise FatalException(-1, "Unable to read configuration for type " + cfg_type + " and tag " + tag)
-
-  return properties, properties_attributes
-
-
-def parse_config_resp(resp):
-  parsed_configs = []
-  if CatConst.ITEMS_TAG in resp:
-    for config_item in resp[CatConst.ITEMS_TAG]:
-      parsed_configs.append({
-        "type": config_item[CatConst.TYPE_TAG],
-        "properties": config_item[CatConst.STACK_PROPERTIES]
-      })
-  return parsed_configs
-
-
-def get_config_resp(cfg_type, error_if_na=True, parsed=False, tag=None):
-  CONFIG_URL_FORMAT = Options.CLUSTER_URL + '/configurations?type={0}&tag={1}'
-
-  # Read the config version
-  if tag is None:
-    structured_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
-
-    if 'Clusters' in structured_resp:
-      if 'desired_configs' in structured_resp['Clusters']:
-        if cfg_type in structured_resp['Clusters']['desired_configs']:
-          tag = structured_resp['Clusters']['desired_configs'][cfg_type]['tag']
-
-  if tag is not None:
-    # Get the config with the tag and return properties
-    structured_resp = curl(CONFIG_URL_FORMAT.format(cfg_type, tag), parse=True, simulate=False,
-                           validate=True, validate_expect_body=True)
-    if parsed:
-      return tag, parse_config_resp(structured_resp)
-    else:
-      return tag, structured_resp
-  else:
-    if error_if_na:
-      raise FatalException(-1, "Unable to get the current version for config type " + cfg_type)
-    else:
-      return tag, None
+  curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True)
 
 
 def get_config_resp_all():
   desired_configs = {}
-  CONFIG_ALL_PROPERTIES_URL = Options.CLUSTER_URL  + "/configurations?fields=properties"
-  desired_configs_resp = curl(Options.CLUSTER_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
-  all_options = curl(CONFIG_ALL_PROPERTIES_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+  config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties,properties_attributes"
+  desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True, simulate=False)
+  all_options = curl(config_all_properties_url, validate=True, parse=True, simulate=False)
 
   if 'Clusters' in desired_configs_resp:
     if 'desired_configs' in desired_configs_resp['Clusters']:
@@ -712,20 +894,35 @@ def get_config_resp_all():
     return None
 
   if CatConst.ITEMS_TAG in all_options:
-    all_options = all_options["items"]
+    all_options = all_options[CatConst.ITEMS_TAG]
   else:
     return None
 
   all_options = filter(
-    lambda x: x["type"] in desired_configs_resp and x["tag"] == desired_configs_resp[x["type"]]["tag"],
+    lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][
+      "tag"],
     all_options)
 
   for item in all_options:
-    if CatConst.STACK_PROPERTIES in item:  # config item could not contain anu property
-      desired_configs[item["type"]] = item["properties"]
+    dc_item = {}
+
+    if CatConst.STACK_PROPERTIES in item:  # config item could not contain any property
+      dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES]
+    else:
+      dc_item[CatConst.STACK_PROPERTIES] = {}
+
+    if CatConst.STACK_PROPERTIES_ATTRIBUTES in item:
+      dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES]
+
+    if "tag" in item:
+      dc_item["tag"] = item["tag"]
+
+    if dc_item != {}:
+      desired_configs[item[CatConst.TYPE_TAG]] = dc_item
 
   return desired_configs
 
+
 def is_services_exists(required_services):
   """
   return true, if required_services is a part of Options.SERVICES
@@ -738,6 +935,7 @@ def is_services_exists(required_services):
 
   return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES
 
+
 def get_cluster_services():
   services_url = Options.CLUSTER_URL + '/services'
   raw_services = curl(services_url, parse=True, simulate=False)
@@ -750,92 +948,73 @@ def get_cluster_services():
   Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work")
   return []
 
-def filter_properties_by_service_presence(config_type, catalog, catalog_properties):
+
+def get_zookeeper_quorum():
+  zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, simulate=False, parse=True)
+  zoo_quorum = []
+  zoo_def_port = "2181"
+  if "host_components" in zoo_cfg:
+    for item in zoo_cfg["host_components"]:
+      zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port))
+
+  return ",".join(zoo_quorum)
+
+
+def get_jt_host(catalog):
   """
-  Filter properties by required-services tag.
-  required-services tag could be catalog to per-property defined. per-property definition
-  will always override per-catalog definition.
-  :param config_type: str
-  :param catalog: UpgradeCatalog
-  :param catalog_properties: dict
-  :return: dict
+  :type catalog: UpgradeCatalog
+  :rtype str
   """
-  cproperties = dict(catalog_properties)
-  catalog_required_services = []
-  del_props = []
-
-  #  do nothing
-  if CatConst.REQUIRED_SERVICES in catalog.config_groups.get(config_type) and \
-    isinstance(catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES], list):
-    catalog_required_services = catalog.config_groups.get(config_type)[CatConst.REQUIRED_SERVICES]
-
-  for prop_name in cproperties:
-    # set per catalog limitation
-    required_services = catalog_required_services
-    if CatConst.REQUIRED_SERVICES in cproperties[prop_name] and\
-      isinstance(cproperties[prop_name][CatConst.REQUIRED_SERVICES], list):
-      # set per property limitation
-      required_services = catalog_properties[prop_name][CatConst.REQUIRED_SERVICES]
-
-    # add property to list for remove
-    if not is_services_exists(required_services):
-      del_props.append(prop_name)
-
-  # remove properties
-  for prop in del_props:
-    del cproperties[prop]
-
-  return cproperties
-
-def modify_config_item(config_type, catalog):
-  #  here should be declared tokens for pattern replace
-
-  if catalog.get_parsed_version()["from"] == 13:  # ToDo: introduce class for pre-defined tokens
-    hostmapping = read_mapping()
-    jt_host = hostmapping["JOBTRACKER"][0]
-    jh_host = hostmapping["HISTORYSERVER"][0]
-  else:
-    jt_host = ""
-    jh_host = ""
-
-  def _substitute(tokens, value):
-    for token in tokens:
-      if token == "{JOBHISTORY_HOST}":
-        value = value.replace(token, jh_host)
-      elif token == "{RESOURCEMANAGER_HOST}":
-        value = value.replace(token, jt_host)
-      elif token == "{ZOOKEEPER_QUORUM}":
-        value = value.replace(token, get_zookeeper_quorum())
-    return value
-  # Exit from function if was passed not suitable parameters
-  catalog.set_substitution_handler(_substitute)
+  if catalog.get_parsed_version()["from"] == 13:
+    return read_mapping()["JOBTRACKER"][0]
 
-  try:
-    properties_latest, properties_attributes_latest = get_config(config_type)
-    properties_latest = rename_all_properties(properties_latest, catalog.property_map_catalog)
-  except Exception as e:
-    properties_latest = {}
-    properties_attributes_latest = None
-
-  properties_copy = catalog.get_properties(config_type)
-  is_merged_copy = CatConst.MERGED_COPY_TAG in catalog.config_groups.get(config_type) \
-   and catalog.config_groups.get(config_type)[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG
-
-  # filter properties by service-required tag
-  properties_copy = filter_properties_by_service_presence(config_type, catalog, properties_copy)
-
-  # ToDo: implement property transfer from one catalog to other
-  #   properties_to_move = [
-  #     "dfs.namenode.checkpoint.edits.dir",
-  #     "dfs.namenode.checkpoint.dir",
-  #     "dfs.namenode.checkpoint.period"]
-  Options.logger.info("Updating '%s' catalog item..." % config_type)
-  if is_merged_copy:  # Append configs to existed ones
-    tag, structured_resp = get_config_resp(config_type, False)
-    if structured_resp is not None:
-      update_config_using_existing_properties(config_type, properties_copy, properties_latest, properties_attributes_latest, catalog)
-  else:  # Rewrite/create config items
-    update_config(catalog.get_properties_as_dict(properties_copy), config_type)
+  return ""
+
+
+def get_jh_host(catalog):
+  """
+  :type catalog: UpgradeCatalog
+  :rtype str
+  """
+  if catalog.get_parsed_version()["from"] == 13:
+    return read_mapping()["HISTORYSERVER"][0]
+
+  return ""
+
+
+def _substitute_handler(upgrade_catalog, tokens, value):
+  """
+  Substitute handler
+  :param upgrade_catalog: UpgradeCatalog
+  :param tokens: list
+  :param value: str
+  :rtype str
+  """
+  for token in tokens:
+    if token == "{JOBHISTORY_HOST}":
+      value = value.replace(token, get_jh_host(upgrade_catalog))
+    elif token == "{RESOURCEMANAGER_HOST}":
+      value = value.replace(token, get_jt_host(upgrade_catalog))
+    elif token == "{ZOOKEEPER_QUORUM}":
+      value = value.replace(token, get_zookeeper_quorum())
+  return value
+
+
+def modify_config_item(config_type, catalog, server_config_factory):
+  """
+  Modify configuration item
+  :type config_type str
+  :type catalog UpgradeCatalog
+  :type server_config_factory ServerConfigFactory
+  """
+
+  # if config group is absent on the server, we will create it
+  if config_type not in server_config_factory.items():
+    server_config_factory.create_config(config_type)
+
+  server_config_catalog = server_config_factory.get_config(config_type)
+
+  server_config_catalog.merge(catalog)
 
 
 def modify_configs():
@@ -845,77 +1024,59 @@ def modify_configs():
     config_type = None
 
   catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json)  # Load upgrade catalog
-  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)  # get desired version of catalog
+  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+                                     Options.OPTIONS.to_stack)  # get desired version of catalog
+
+  # load all desired configs from the server
+  server_config_factory = ServerConfigFactory()
 
   if catalog is None:
     raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified"
                          % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
 
-  if config_type is not None and config_type not in catalog.config_groups.list():
+  # add user-defined template processing function
+  catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler)
+
+  if config_type is not None and config_type not in catalog.items:
     raise FatalException("Config type %s not exists, no configs was modified" % config_type)
 
   if config_type is not None:
-    modify_config_item(config_type, catalog)
+    modify_config_item(config_type, catalog, server_config_factory)
   else:
-    for collection_name in catalog.config_groups.list():
-      modify_config_item(collection_name, catalog)
-
-
-def rename_all_properties(properties, name_mapping):
-  for key, val in name_mapping.items():
-    if (key in properties.keys()) and (val not in properties.keys()):
-      properties[val] = properties[key]
-      del properties[key]
-  return properties
-
-
-# properties template - passed as dict from UpgradeCatalog
-def update_config_using_existing_properties(conf_type, properties_template,
-                                            site_properties, properties_attributes_latest, catalog):
-  keys_processed = []
-  keys_to_delete = []
-  properties_parsed = catalog.get_properties_as_dict(properties_template)
-
-  for key in properties_template.keys():
-    keys_processed.append(key)
-    if CatConst.PROPERTY_REMOVE_TAG in properties_template and properties_template[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
-      keys_to_delete.append(key)
-
-  for key in site_properties.keys():
-    if key not in keys_processed:
-      properties_parsed[key] = site_properties[key]
-
-  for key in keys_to_delete:
-    del properties_parsed[key]
-
-  # check property attributes list
-  if properties_attributes_latest is not None:
-    for key in properties_attributes_latest:
-      properties_attributes_latest[key] = dict(filter(
-        lambda (item_key, item_value): item_key not in keys_to_delete,
-        zip(properties_attributes_latest[key].keys(), properties_attributes_latest[key].values())
-      ))
+    for collection_name in catalog.items:
+      modify_config_item(collection_name, catalog, server_config_factory)
+
+  server_config_factory.process_mapping_transformations(catalog)
 
-  update_config(properties_parsed, conf_type, attributes=properties_attributes_latest)
+  # commit changes to server, if any will be found
+  server_config_factory.commit()
 
 
 def backup_configs(conf_type=None):
-  DESIRED_CONFIGS_URL = Options.CLUSTER_URL + "?fields=Clusters/desired_configs"
+  dir = "backups_%d" % time.time()
+  file_pattern = "%s%s%s_%s.json"
+  configs = get_config_resp_all()
+  if configs is None:
+    Options.logger.error("Unexpected response from the server")
+    return -1
 
-  desired_configs = curl(DESIRED_CONFIGS_URL, validate=True, validate_expect_body=True, parse=True, simulate=False)
+  if conf_type is not None and conf_type in configs:
+    configs = {conf_type: configs[conf_type]}
 
-  if "Clusters" in desired_configs and "desired_configs" in desired_configs["Clusters"]:
-    for conf_type in desired_configs["Clusters"]["desired_configs"].keys():
-      backup_single_config_type(conf_type, True)
+  if not os.path.exists(dir):
+    os.mkdir(dir)
 
+  for item in configs:
+    filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"])
+    if os.path.exists(filename):
+      os.remove(filename)
 
-def backup_single_config_type(conf_type, error_if_na=True):
-  tag, response = get_config_resp(conf_type, error_if_na)
-  if response is not None:
-    Options.logger.info("Saving config for type: " + conf_type + " and tag: " + tag)
-    write_config(response, conf_type, tag)
-  else:
-    Options.logger.info("Unable to obtain config for type: " + conf_type)
+    try:
+      with open(filename, "w") as f:
+        f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4))
+      Options.logger.info("Catalog \"%s\" stored to %s", item, filename)
+    except IOError as e:
+      Options.logger.error("Unable to store \"%s\": %s", item, e)
 
 
 def install_services():
@@ -948,94 +1109,91 @@ def install_services():
   err_message = ""
   for index in [0, 1]:
     try:
-      curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True,
-           validate_expect_body=not Options.OPTIONS.printonly, request_type="PUT", data=PUT_IN_INSTALLED[index])
+      curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index])
     except FatalException as e:
       if not e.code == 0:
         err_retcode = e.code
         err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "."
 
   if err_retcode != 0:
-    raise FatalException(err_retcode, err_message + "(Services may already be installed or agents are not yet started.)")
+    raise FatalException(err_retcode,
+                         err_message + "(Services may already be installed or agents are not yet started.)")
 
   Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \
-                         "the status of the install requests."
+                                 "the status of the install requests."
 
 
-def validate_response(response, expect_body):
-  if expect_body:
-    if "\"href\" : \"" not in response:
-      return 1, response
-    else:
-      return 0, ""
-  elif len(response) > 0:
-    return 1, response
-  else:
-    return 0, ""
+def generate_auth_header(user, password):
+  token = "%s:%s" % (user, password)
+  token = base64.encodestring(token)
+  return {"Authorization": "Basic %s" % token.replace('\n', '')}
 
 
 def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False,
-         simulate=None, validate=False, validate_expect_body=False, soft_validation=False):
+         simulate=None, validate=False, soft_validation=False):
+  _headers = {}
+  handler_chain = []
+  post_req = ["POST", "PUT"]
+  get_req = ["GET", "DELETE"]
 
   simulate_only = Options.CURL_PRINT_ONLY is not None or (simulate is not None and simulate is True)
   print_url = Options.CURL_PRINT_ONLY is not None and simulate is not None
+  if request_type not in post_req + get_req:
+    raise IOError("Wrong request type \"%s\" passed" % request_type)
 
-  curl_path = '/usr/bin/curl'
-  curl_list = [curl_path]
-
-  curl_list.append('-X')
-  curl_list.append(request_type)
+  if data is not None and isinstance(data, dict):
+    data = json.dumps(data)
 
   if tokens is not None:
-    curl_list.append('-u')
-    curl_list.append("%s:%s" % (tokens["user"], tokens["pass"]))
+    _headers.update(generate_auth_header(tokens["user"], tokens["pass"]))
   elif Options.API_TOKENS is not None:
-    curl_list.append('-u')
-    curl_list.append("%s:%s" % (Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
-
-  if request_type in Options.POST_REQUESTS:
-    curl_list.append(url)
+    _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"]))
 
-  if headers is None and Options.HEADERS is not None:
-    headers = Options.HEADERS
+  if request_type in post_req and data is not None:
+    _headers["Content-Length"] = len(data)
 
   if headers is not None:
-    for header in headers:
-      curl_list.append('-H')
-      curl_list.append("%s: %s" % (header, headers[header]))
+    _headers.update(headers)
 
-  if data is not None and request_type in Options.POST_REQUESTS:
-    curl_list.append('--data')
-    curl_list.append(json.dumps(data))
+  if Options.HEADERS is not None:
+    _headers.update(Options.HEADERS)
 
-  if request_type in Options.GET_REQUESTS:
-    curl_list.append(url)
+  director = build_opener(*handler_chain)
+  if request_type in post_req:
+    _data = bytes(data)
+    req = Request(url, headers=_headers, data=_data)
+  else:
+    req = Request(url, headers=_headers)
+
+  req.get_method = lambda: request_type
 
   if print_url:
-    Options.logger.info(" ".join(curl_list))
+    Options.logger.info(url)
 
+  code = 200
   if not simulate_only:
-    osStat = subprocess.Popen(
-      curl_list,
-      stderr=subprocess.PIPE,
-      stdout=subprocess.PIPE)
-    out, err = osStat.communicate()
-    if 0 != osStat.returncode:
-      error = "curl call failed. out: " + out + " err: " + err
-      Options.logger.error(error)
-      raise FatalException(osStat.returncode, error)
+    try:
+      resp = director.open(req)
+      out = resp.read()
+      if isinstance(out, bytes):
+        out = out.decode("utf-8")
+      code = resp.code
+    except URLError as e:
+      Options.logger.error(str(e))
+      if isinstance(e, HTTPError):
+        raise e
+      else:
+        raise FatalException(-1, str(e))
   else:
     if not print_url:
-      Options.logger.info(" ".join(curl_list))
+      Options.logger.info(url)
     out = "{}"
 
-  if validate and not simulate_only:
-    retcode, errdata = validate_response(out, validate_expect_body)
-    if not retcode == 0:
-      if soft_validation:
-        Options.logger.warning("Response validation failed, please check previous action result manually.")
-      else:
-        raise FatalException(retcode, errdata)
+  if validate and not simulate_only and (code > 299 or code < 200):
+    if soft_validation:
+      Options.logger.warning("Response validation failed, please check previous action result manually.")
+    else:
+      raise FatalException(code, "Response validation failed, please check previous action result manually.")
 
   if parse:
     return json.loads(out)
@@ -1055,15 +1213,13 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
    }
   :param collection_name:
   :param catalog:
+  :param actual_properties_list
   :return:
   """
 
   verified_catalog = []
-  catalog_properties = catalog.get_properties(collection_name)
-  actual_properties = None
-
-  if collection_name in actual_properties_list:
-    actual_properties = actual_properties_list[collection_name]
+  catalog_properties = dict(catalog)
+  actual_properties = dict(actual_properties_list)
 
   if actual_properties is None:
     verified_catalog = map(lambda x: {
@@ -1085,7 +1241,8 @@ def configuration_item_diff(collection_name, catalog, actual_properties_list):
     verified_catalog_catalog = map(lambda x: {
       "property": x,
       "catalog_item": catalog_properties[x],
-      "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in catalog_properties[x] else None,
+      "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in
+                                                                             catalog_properties[x] else None,
       "actual_value": actual_properties[x] if x in actual_properties else None,
     }, catalog_properties.keys())
 
@@ -1114,18 +1271,18 @@ def configuration_diff_analyze(diff_list):
       # process properties which can be absent
 
       # item was removed, from actual configs according to catalog instructions
-      if property_item["actual_value"] is None and property_item["catalog_value"] is None \
-        and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
-        and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
+      if property_item["actual_value"] is None \
+              and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \
+              and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG:
 
         push_status("ok", property_item)
 
-       # currently skip values with template tag, as there no filter implemented
-       # ToDo: implement possibility to filter values without filter handler,
-       # ToDo: currently filtering is possible only on update-configs stage
+        # currently skip values with template tag, as there no filter implemented
+        # ToDo: implement possibility to filter values without filter handler,
+        # ToDo: currently filtering is possible only on update-configs stage
       elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \
-        and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
-        and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
+              and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \
+              and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG:
 
         push_status("skipped", property_item)
 
@@ -1156,23 +1313,24 @@ def verify_configuration():
     config_type = None
 
   catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json)  # Load upgrade catalog
-  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)  # get desired version of catalog
+  catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack,
+                                     Options.OPTIONS.to_stack)  # get desired version of catalog
+  server_configs = ServerConfigFactory()
 
   if catalog is None:
     raise FatalException(1, "Upgrade catalog for version %s-%s not found"
                          % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack))
 
-  if config_type is not None and config_type not in catalog.config_groups.list():
+  if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items():
     raise FatalException("Config type %s not exists" % config_type)
 
   # fetch from server all option at one time and filter only desired versions
-  actual_options = get_config_resp_all()
 
   if config_type is not None:
-    diff_list[config_type] = configuration_item_diff(config_type, catalog, actual_options)
+    diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties)
   else:
-    for collection_name in catalog.config_groups.list():
-      diff_list[collection_name] = configuration_item_diff(collection_name, catalog, actual_options)
+    for collection_name in catalog.items.keys():
+      diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties)
 
   analyzed_list = configuration_diff_analyze(diff_list)
 
@@ -1187,7 +1345,7 @@ def verify_configuration():
     if analyzed_list[config_item]["fail"]["count"] != 0:
       Options.logger.info(
         "%s: %s missing configuration(s) - please look in the output file for the missing params" % (
-         config_item, analyzed_list[config_item]["fail"]["count"]
+          config_item, analyzed_list[config_item]["fail"]["count"]
         )
       )
       if report_file is not None:
@@ -1206,16 +1364,12 @@ def report_formatter(report_file, config_item, analyzed_list_item):
   prefix = "Configuration item %s" % config_item
   if analyzed_list_item["fail"]["count"] > 0:
     for item in analyzed_list_item["fail"]["items"]:
-      report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"" % (
+      report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % (
         prefix, item["property"], item["actual_value"], item["catalog_value"]
       ))
 
 
-#
-# Main.
-#
 def main():
-
   action_list = {  # list of supported actions
                    Options.GET_MR_MAPPING_ACTION: get_mr1_mapping,
                    Options.DELETE_MR_ACTION: delete_mr,
@@ -1224,7 +1378,7 @@ def main():
                    Options.INSTALL_YARN_MR2_ACTION: install_services,
                    Options.BACKUP_CONFIG_ACTION: backup_configs,
                    Options.VERIFY_ACTION: verify_configuration
-  }
+                   }
 
   parser = optparse.OptionParser(usage="usage: %prog [options] action\n  Valid actions: "
                                        + ", ".join(action_list.keys())

http://git-wip-us.apache.org/repos/asf/ambari/blob/1e158101/ambari-server/src/test/python/TestUpgradeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestUpgradeHelper.py b/ambari-server/src/test/python/TestUpgradeHelper.py
index 1624dd2..73b1e4e 100644
--- a/ambari-server/src/test/python/TestUpgradeHelper.py
+++ b/ambari-server/src/test/python/TestUpgradeHelper.py
@@ -48,6 +48,7 @@ class TestUpgradeHelper(TestCase):
   catalog_to = "2.2"
   catalog_cfg_type = "my type"
   required_service = "TEST"
+  curl_response = "{}"
   test_catalog = """{
    "version": "1.0",
    "stacks": [
@@ -95,19 +96,12 @@ class TestUpgradeHelper(TestCase):
     sys.stdout = self.out
 
   def magic_curl(self, *args, **kwargs):
+    resp = self.curl_response
+    self.curl_response = "{}"
+    if "parse" in kwargs and isinstance(resp, str) and kwargs["parse"] == True:
+      resp = json.loads(resp)
+    return resp
 
-    def ret_object():
-      return ""
-
-    def communicate():
-      return "{}", ""
-
-    ret_object.returncode = 0
-    ret_object.communicate = communicate
-
-    with patch("upgradeHelper.subprocess") as subprocess:
-      subprocess.Popen.return_value = ret_object
-      self.original_curl(*args, **kwargs)
 
   def tearDown(self):
     sys.stdout = sys.__stdout__
@@ -155,19 +149,6 @@ class TestUpgradeHelper(TestCase):
     self.assertEqual(True, actual_result)
     self.assertEqual(True, actual_result_1)
 
-  @patch.object(upgradeHelper, "is_services_exists")
-  def test_filter_properties_by_service_presence(self, is_service_exists_mock):
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-    cfg_type = self.catalog_cfg_type
-    is_service_exists_mock.return_value = True
-
-    old_services = upgradeHelper.Options.SERVICES
-    upgradeHelper.Options.SERVICES = set([self.required_service])
-    actual_result = upgradeHelper.filter_properties_by_service_presence(cfg_type, catalog, catalog.get_properties(self.catalog_cfg_type))
-
-    upgradeHelper.Options.SERVICES = old_services
-    self.assertEqual(catalog.get_properties(self.catalog_cfg_type), actual_result)
 
   @patch("__builtin__.open")
   @patch.object(os.path, "isfile")
@@ -193,34 +174,6 @@ class TestUpgradeHelper(TestCase):
 
   @patch("__builtin__.open")
   @patch.object(os.path, "isfile")
-  @patch("os.remove")
-  def test_write_config(self, remove_mock, isfile_mock, open_mock):
-    test_data = {
-      "test_field": "test_value"
-    }
-    test_result = json.dumps(test_data)
-    test_cfgtype = 'cfg.me'
-    test_tag = 'tag.test'
-    test_filename = "%s_%s" % (test_cfgtype, test_tag)
-    test_result = json.dumps(test_data)
-    output = StringIO()
-    isfile_mock.return_value = True
-    open_mock.return_value = output
-
-    # execute testing function
-    upgradeHelper.write_config(test_data, test_cfgtype, test_tag)
-
-    self.assertEquals(1, isfile_mock.call_count)
-    self.assertEquals(1, remove_mock.call_count)
-    self.assertEquals(1, open_mock.call_count)
-
-    # check file name
-    self.assertEquals(test_filename, isfile_mock.call_args[0][0])
-    # check content
-    self.assertEquals(test_result, output.getvalue())
-
-  @patch("__builtin__.open")
-  @patch.object(os.path, "isfile")
   def test_read_mapping(self, isfile_mock, open_mock):
     test_data = {
       "test_field": "test_value"
@@ -353,8 +306,7 @@ class TestUpgradeHelper(TestCase):
               {
                 "request_type": "PUT",
                 "data": PUT_IN_DISABLED,
-                "validate": True,
-                "validate_expect_body": False
+                "validate": True
               }
             ]
           )
@@ -364,8 +316,7 @@ class TestUpgradeHelper(TestCase):
         (SERVICE_URL_FORMAT,),
         {
           "request_type": "DELETE",
-          "validate": True,
-          "validate_expect_body": False
+          "validate": True
         }
       ]
     )
@@ -439,7 +390,6 @@ class TestUpgradeHelper(TestCase):
         (SERVICE_URL_FORMAT.format(service),),
         {
           "validate": True,
-          "validate_expect_body": False,
           "request_type": "POST"
         }
       ])
@@ -448,7 +398,6 @@ class TestUpgradeHelper(TestCase):
           (COMPONENT_URL_FORMAT.format(service, component),),
           {
             "validate": True,
-            "validate_expect_body": False,
             "request_type": "POST"
           }
         ])
@@ -457,7 +406,6 @@ class TestUpgradeHelper(TestCase):
             (HOST_COMPONENT_URL_FORMAT.format(host, component),),
             {
               "validate": True,
-              "validate_expect_body": False,
               "request_type": "POST"
             }
           ])
@@ -488,8 +436,7 @@ class TestUpgradeHelper(TestCase):
         "request_type": "PUT",
         "data": copy.deepcopy(properties_payload),
         "validate": True,
-        "soft_validation": True,
-        "validate_expect_body": True
+        "soft_validation": True
       }
     )
 
@@ -500,8 +447,7 @@ class TestUpgradeHelper(TestCase):
         "request_type": "PUT",
         "data": copy.deepcopy(properties_payload),
         "validate": True,
-        "soft_validation": True,
-        "validate_expect_body": True
+        "soft_validation": True
       }
     )
 
@@ -547,86 +493,6 @@ class TestUpgradeHelper(TestCase):
 
     self.assertEqual(expected_result, actual_result)
 
-  @patch.object(upgradeHelper, "get_config_resp")
-  def test_get_config(self, get_config_resp_mock):
-    config_type = "test type"
-    tag_name = "my tag"
-    properties = {
-      "my property": "property value"
-    }
-    property_atributes = {
-      "myproperty": "property value"
-    }
-
-    in_data = tag_name, {
-      "items": [
-        {
-          "tag": tag_name,
-          "type": config_type,
-          "properties": properties,
-          "properties_attributes": property_atributes
-        }
-      ]
-    }
-
-    get_config_resp_mock.return_value = in_data
-    expected_data = (properties, property_atributes)
-
-    # execute testing function
-    actual_data = upgradeHelper.get_config(config_type)
-
-    self.assertEqual(expected_data, actual_data)
-
-  def test_parse_config_resp(self):
-    cfg_type = "type 1"
-    cfg_properties = {
-      "my property": "property value"
-    }
-
-    in_data = {
-      upgradeHelper.CatConst.ITEMS_TAG: [
-        {
-          upgradeHelper.CatConst.TYPE_TAG: cfg_type,
-          upgradeHelper.CatConst.STACK_PROPERTIES: cfg_properties
-        }
-      ]
-    }
-
-    expected_result = [{
-     "type": cfg_type,
-     "properties": cfg_properties
-    }]
-
-    # execute testing function
-    actual_result = upgradeHelper.parse_config_resp(in_data)
-
-    self.assertEqual(expected_result, actual_result)
-
-  @patch.object(upgradeHelper, "curl")
-  @patch.object(upgradeHelper, "parse_config_resp")
-  def test_get_config_resp(self, parse_config_resp_mock, curl_mock):
-    cfg_type = "my type"
-    cfg_tag = "my tag"
-    cfg_data = "test"
-    curl_responses = [
-      {
-        'Clusters': {
-          'desired_configs': {
-            cfg_type: {
-              "tag": cfg_tag
-            }
-          }
-        }
-      },
-      cfg_data
-    ]
-    curl_mock.side_effect = MagicMock(side_effect=curl_responses)
-
-    # execute testing function
-    actual_tag, actual_data = upgradeHelper.get_config_resp(cfg_type, False, False)
-
-    self.assertEqual((cfg_tag, cfg_data), (actual_tag, actual_data))
-
   @patch.object(upgradeHelper, "curl")
   def test_get_config_resp_all(self, curl_mock):
     cfg_type = "my type"
@@ -656,8 +522,11 @@ class TestUpgradeHelper(TestCase):
     ]
 
     expected_result = {
-      cfg_type: cfg_properties
-    }
+        cfg_type: {
+          "properties": cfg_properties,
+          "tag": cfg_tag
+        }
+      }
     curl_mock.side_effect = MagicMock(side_effect=curl_resp)
 
     # execute testing function
@@ -666,160 +535,31 @@ class TestUpgradeHelper(TestCase):
     self.assertEquals(expected_result, actual_result)
     pass
 
-  @patch.object(upgradeHelper, "read_mapping")
-  @patch.object(upgradeHelper, "get_config")
-  @patch.object(upgradeHelper, "update_config_using_existing_properties")
-  @patch.object(upgradeHelper, "update_config")
-  @patch.object(upgradeHelper, "get_config_resp")
-  def test_modify_config_item(self, get_config_resp_mock, upgrade_config_mock, update_config_using_existing_properties_mock,
-                              get_config_mock, read_mapping_mock):
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    get_config_resp_mock.return_value = "", {}
-    old_services = upgradeHelper.Options.SERVICES
-    upgradeHelper.Options.SERVICES = set([self.required_service])
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-    cfg_type = self.catalog_cfg_type
-    read_mapping_mock.return_value = {
-      "MAPREDUCE_CLIENT": ["test.host.vm"],
-      "JOBTRACKER": ["test1.host.vm"],
-      "TASKTRACKER": ["test2.host.vm"],
-      "HISTORYSERVER": ["test3.host.vm"]
-    }
-    get_config_mock.return_value = {"my replace property": "property value 2"}, {}
-    expected_params = [
-      cfg_type,
-      {
-        "my property": {
-          "value": "my value",
-          "required-services": ["TEST"]
-        }
-      },
-      {
-        "my property 2": "property value 2"
-      },
-    ]
-
-    # execute testing function
-    upgradeHelper.modify_config_item(cfg_type, catalog)
-
-    upgradeHelper.Options.SERVICES = old_services
-
-    actual_params = [
-      update_config_using_existing_properties_mock.call_args[0][0],
-      update_config_using_existing_properties_mock.call_args[0][1],
-      update_config_using_existing_properties_mock.call_args[0][2]
-    ]
-    self.assertEquals(update_config_using_existing_properties_mock.call_count, 1)
-    self.assertEqual(upgrade_config_mock.call_count, 0)
-
-    self.assertEqual(expected_params, actual_params)
-
-  @patch.object(upgradeHelper, "UpgradeCatalogFactory", autospec=True)
-  @patch.object(upgradeHelper, "modify_config_item")
-  def test_modify_configs(self, modify_config_item_mock, factory_mock):
-    factory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
-    options = lambda: ""
-    options.from_stack = self.catalog_from
-    options.to_stack = self.catalog_to
-    options.upgrade_json = ""
-
-    upgradeHelper.Options.OPTIONS = options
-
-    # execute testing function
-    upgradeHelper.modify_configs()
-
-    self.assertEqual(1, modify_config_item_mock.call_count)
-    self.assertEqual(self.catalog_cfg_type, modify_config_item_mock.call_args[0][0])
-
-  def test_rename_all_properties(self):
-    in_data_properties = {
-      "test property": "test value",
-      "rename property": "test value 2"
-    }
-    in_data_mapping = {
-      "rename property": "test property 2"
-    }
-    expect_properties = {
-      "test property": "test value",
-      "test property 2": "test value 2"
-    }
-
-    # execute testing function
-    actual_properties = upgradeHelper.rename_all_properties(in_data_properties, in_data_mapping)
-
-    self.assertEqual(expect_properties, actual_properties)
-
-  @patch.object(upgradeHelper, "update_config")
-  def test_update_config_using_existing_properties(self, update_config_mock):
-    actual_property = {
-      "actual property": "actual value"
-    }
-    actual_attrib = {
+  @patch.object(upgradeHelper, "get_config_resp_all")
+  @patch("os.mkdir")
+  @patch("os.path.exists")
+  @patch("__builtin__.open")
+  def test_backup_configs(self, open_mock, os_path_exists_mock, mkdir_mock, get_config_resp_all_mock):
+    data = {
       self.catalog_cfg_type: {
-        "attribute 1": "attribute value 1"
-      }
-    }
-    catalog_factory = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = catalog_factory.get_catalog(self.catalog_from, self.catalog_to)
-
-    # execute testing function
-    upgradeHelper.update_config_using_existing_properties(self.catalog_cfg_type,
-                                                          catalog.get_properties(self.catalog_cfg_type),
-                                                          actual_property,
-                                                          actual_attrib,
-                                                          catalog
-                                                          )
-    expected_dict = {}
-    expected_dict.update(actual_property)
-    expected_dict.update(catalog.get_properties_as_dict(catalog.get_properties(self.catalog_cfg_type)))
-
-    expected_args = (
-      (expected_dict, self.catalog_cfg_type),
-      {
-        "attributes": actual_attrib
-      }
-    )
-
-    self.assertEqual(1, update_config_mock.call_count)
-    self.assertEqual(expected_args, tuple(update_config_mock.call_args))
-
-  @patch.object(upgradeHelper, "backup_single_config_type")
-  @patch.object(upgradeHelper, "curl")
-  def test_backup_configs(self, curl_mock, backup_single_config_type_mock):
-    curl_mock.return_value = {
-      'Clusters': {
-        'desired_configs': {
-          self.catalog_cfg_type: {
-            "tag": "my tag"
-          }
-        }
+        "properties": {
+          "test-property": "value"
+        },
+        "tag": "version1"
       }
     }
-
-    expected_args = (self.catalog_cfg_type, True)
+    os_path_exists_mock.return_value = False
+    get_config_resp_all_mock.return_value = data
+    expected = json.dumps(data[self.catalog_cfg_type]["properties"], indent=4)
+    stream = StringIO()
+    m = MagicMock()
+    m.__enter__.return_value = stream
+    open_mock.return_value = m
 
     # execute testing function
     upgradeHelper.backup_configs(self.catalog_cfg_type)
 
-    self.assertEqual(1, backup_single_config_type_mock.call_count)
-    self.assertEqual(expected_args, backup_single_config_type_mock.call_args[0])
-
-  @patch.object(upgradeHelper, "get_config_resp")
-  @patch.object(upgradeHelper, "write_config")
-  def test_backup_single_config_type(self, write_config_mock, get_config_resp_mock):
-    resp = {
-      "property": "my data"
-    }
-    tag = "my tag"
-    get_config_resp_mock.return_value = tag, resp
-    expected_args = (resp, self.catalog_cfg_type, tag)
-
-    # execute testing function
-    upgradeHelper.backup_single_config_type(self.catalog_cfg_type, False)
-
-    self.assertEqual(1, get_config_resp_mock.call_count)
-    self.assertEqual(1, write_config_mock.call_count)
-    self.assertEqual(expected_args, write_config_mock.call_args[0])
+    self.assertEqual(expected, stream.getvalue())
 
   @patch.object(upgradeHelper, "curl")
   def test_install_services(self, curl_mock):
@@ -827,7 +567,6 @@ class TestUpgradeHelper(TestCase):
       (
         ('http://127.0.0.1:8080/api/v1/clusters/test1/services/MAPREDUCE2',),
         {
-          'validate_expect_body': True,
           'request_type': 'PUT',
           'data': {
             'RequestInfo': {
@@ -845,7 +584,6 @@ class TestUpgradeHelper(TestCase):
       (
         ('http://127.0.0.1:8080/api/v1/clusters/test1/services/YARN',),
         {
-          'validate_expect_body': True,
           'request_type': 'PUT',
           'data': {
             'RequestInfo': {
@@ -869,53 +607,6 @@ class TestUpgradeHelper(TestCase):
     for i in range(0, 1):
       self.assertEqual(expected_args[i], tuple(curl_mock.call_args_list[i]))
 
-  def test_validate_response(self):
-    resp_in_data = [
-      ["", False],
-      ["", True],
-      ["\"href\" : \"", True]
-    ]
-    resp_expected_results = [
-      (0, ""),
-      (1, ""),
-      (0, "")
-    ]
-
-    # execute testing function
-    for i in range(0, len(resp_in_data)):
-      actual_code, actual_data = upgradeHelper.validate_response(resp_in_data[i][0], resp_in_data[i][1])
-      self.assertEqual(resp_expected_results[i], (actual_code, actual_data))
-
-  def test_configuration_item_diff(self):
-    factory_mock = UpgradeCatalogFactoryMock(self.test_catalog)
-    catalog = factory_mock.get_catalog(self.catalog_from, self.catalog_to)
-    actual_properties = {
-      self.catalog_cfg_type: {
-        "my property": {
-         "value": "my value"
-        }
-      }
-    }
-
-    expected_result = [
-      {
-        'catalog_item': {
-          'value': u'my value',
-          'required-services': [u'TEST']
-        },
-        'property': 'my property',
-        'actual_value': {
-          'value': 'my value'
-        },
-        'catalog_value': u'my value'
-      }
-    ]
-
-    # execute testing function
-    actual_result = upgradeHelper.configuration_item_diff(self.catalog_cfg_type, catalog, actual_properties)
-
-    self.assertEqual(expected_result, actual_result)
-
   def test_configuration_diff_analyze(self):
     in_data = {
         self.catalog_cfg_type: [
@@ -978,7 +669,13 @@ class TestUpgradeHelper(TestCase):
     options.upgrade_json = ""
 
     upgradeHelper.Options.OPTIONS = options
+    upgradeHelper.Options.SERVICES = [self.required_service]
     upgradecatalogfactory_mock.return_value = UpgradeCatalogFactoryMock(self.test_catalog)
+    get_config_resp_all_mock.return_value = {
+      self.catalog_cfg_type: {
+        "properties": {}
+      }
+    }
 
     # execute testing function
     upgradeHelper.verify_configuration()
@@ -1022,7 +719,7 @@ class TestUpgradeHelper(TestCase):
         }
     }
 
-    expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\""
+    expected_output = "Configuration item my type: property \"my property\" is set to \"my value 1\", but should be set to \"my value\"\n"
 
     # execute testing function
     upgradeHelper.report_formatter(file, cfg_item, analyzed_list)