You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2018/08/24 17:11:48 UTC
[ambari] branch branch-feature-AMBARI-14714 updated: [AMBARI-24537]
- Use Mpack Instance Manager To Switch Component Instance Version on
Upgrade (#2164)
This is an automated email from the ASF dual-hosted git repository.
jonathanhurley pushed a commit to branch branch-feature-AMBARI-14714
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-feature-AMBARI-14714 by this push:
new 0df731b [AMBARI-24537] - Use Mpack Instance Manager To Switch Component Instance Version on Upgrade (#2164)
0df731b is described below
commit 0df731b3d7b86670aa7fc1173663748bc49ffa34
Author: Jonathan Hurley <jo...@apache.org>
AuthorDate: Fri Aug 24 13:11:43 2018 -0400
[AMBARI-24537] - Use Mpack Instance Manager To Switch Component Instance Version on Upgrade (#2164)
---
.../libraries/functions/copy_tarball.py | 11 +-
.../libraries/functions/upgrade_summary.py | 323 +++++++++++----------
.../resource_management/libraries/script/script.py | 195 +++++--------
.../internal/UpgradeResourceProvider.java | 33 ++-
.../apache/ambari/server/state/UpgradeContext.java | 12 +-
.../stack-hooks/before-ANY/scripts/params.py | 7 -
.../src/test/python/TestUpgradeSummary.py | 25 +-
7 files changed, 301 insertions(+), 305 deletions(-)
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/copy_tarball.py b/ambari-common/src/main/python/resource_management/libraries/functions/copy_tarball.py
index 6c39aff..819d5f3 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/copy_tarball.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/copy_tarball.py
@@ -367,16 +367,15 @@ def get_current_version(service=None, use_upgrading_version_during_upgrade=True)
:return: Version, or False if an error occurred.
"""
- from resource_management.libraries.functions import upgrade_summary
+ from resource_management.libraries.functions.upgrade_summary import UpgradeSummary
+
+ upgrade_summary = UpgradeSummary if Script.is_upgrade_in_progress() else None
# get the version for this command
version = stack_features.get_stack_feature_version(Script.get_config())
- if service is not None:
- version = upgrade_summary.get_target_version(service_name=service, default_version=version)
-
# if there is no upgrade, then use the command's version
- if not Script.in_stack_upgrade() or use_upgrading_version_during_upgrade:
+ if upgrade_summary is None or use_upgrading_version_during_upgrade:
Logger.info("Tarball version was calcuated as {0}. Use Command Version: {1}".format(
version, use_upgrading_version_during_upgrade))
@@ -385,7 +384,7 @@ def get_current_version(service=None, use_upgrading_version_during_upgrade=True)
# we're in an upgrade and we need to use an older version
current_version = stack_select.get_role_component_current_stack_version()
if service is not None:
- current_version = upgrade_summary.get_source_version(service_name=service, default_version=current_version)
+ current_version = upgrade_summary.get_service_source_version(service_name=service, default_version=current_version)
if current_version is None:
Logger.warning("Unable to determine the current version of the component for this command; unable to copy the tarball")
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/upgrade_summary.py b/ambari-common/src/main/python/resource_management/libraries/functions/upgrade_summary.py
index 7bc8c20..8f4819d 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/upgrade_summary.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/upgrade_summary.py
@@ -19,166 +19,185 @@ limitations under the License.
"""
from collections import namedtuple
-from resource_management.libraries.script.script import Script
+from ambari_commons.constants import UPGRADE_TYPE_EXPRESS
+from ambari_commons.constants import UPGRADE_TYPE_HOST_ORDERED
+from ambari_commons.constants import UPGRADE_TYPE_ROLLING
from resource_management.libraries.functions.constants import Direction
-UpgradeSummary = namedtuple("UpgradeSummary", "direction is_revert service_groups")
+UpgradeServiceGroupSummary = namedtuple("UpgradeServiceGroupSummary",
+ ["type", "service_group_id", "service_group_name", "source_mpack_id", "target_mpack_id",
+ "source_mpack_name", "target_mpack_name", "source_mpack_version", "target_mpack_version",
+ "source_stack", "target_stack", "services"])
-UpgradeServiceGroupSummary = namedtuple("UpgradeServiceGroupSummary", "type service_group_id service_group_name source_mpack_id target_mpack_id source_stack target_stack source_mpack_version target_mpack_version services")
-UpgradeServiceSummary = namedtuple("UpgradeServiceSummary", "service_name source_version target_version components")
-UpgradeComponentSummary = namedtuple("UpgradeComponentSummary", "component_name source_version target_version")
+UpgradeServiceSummary = namedtuple("UpgradeServiceSummary", ["service_name", "source_version", "target_version", "components"])
+UpgradeComponentSummary = namedtuple("UpgradeComponentSummary", ["component_name", "source_version", "target_version"])
+__all__ = ["UpgradeSummary"]
-def get_upgrade_summary():
+class UpgradeSummary(object):
"""
- Gets a summary of an upgrade in progress, including type, direction, orchestration and from/to
- versions.
+ Represents the state of an upgrade or downgrade is one is in progress, including all service groups,
+ services, components and their respective mpack source/target versions.
"""
- config = Script.get_config()
- if "upgradeSummary" not in config or not config["upgradeSummary"]:
- return None
-
- upgrade_summary = config["upgradeSummary"]
-
- service_group_summary_dict = {}
- for service_group_name, service_group_summary_json in upgrade_summary["serviceGroups"].iteritems():
- service_summary_dict = {}
-
- service_group_summary = UpgradeServiceGroupSummary(type = service_group_summary_json["type"],
- service_group_id = service_group_summary_json["serviceGroupId"],
- service_group_name = service_group_summary_json["serviceGroupName"],
- source_mpack_id = service_group_summary_json["sourceMpackId"],
- target_mpack_id = service_group_summary_json["targetMpackId"],
- source_stack = service_group_summary_json["sourceStack"],
- target_stack = service_group_summary_json["targetStack"],
- source_mpack_version = service_group_summary_json["sourceMpackVersion"],
- target_mpack_version = service_group_summary_json["targetMpackVersion"],
- services = service_summary_dict)
-
- service_group_summary_dict[service_group_name] = service_group_summary
-
- for service_name, service_summary_json in service_group_summary_json["services"].iteritems():
- component_summary_dict = {}
-
- service_summary = UpgradeServiceSummary(service_name = service_name,
- source_version = service_summary_json["sourceVersion"],
- target_version = service_summary_json["targetVersion"], components = component_summary_dict)
-
- service_summary_dict[service_name] = service_summary
-
- for component_name, component_summary_json in service_summary_json["components"].iteritems():
- component_summary = UpgradeComponentSummary(component_name = component_name,
- source_version = component_summary_json["sourceVersion"],
- target_version = component_summary_json["targetVersion"])
- component_summary_dict[component_name] = component_summary
-
- return UpgradeSummary(direction=upgrade_summary["direction"],
- is_revert = upgrade_summary["isRevert"],
- service_groups = service_group_summary_dict)
-
-
-def get_source_version(service_group_name = None, service_name = None, default_version=None):
- """
- Gets the source (from) version of a service participating in an upgrade. If there is no
- upgrade or the specific service is not participating, this will return None.
- :param service_group_name: the service group name to check for, or None to extract it from the command
- :param service_name: the service name to check for, or None to extract it from the command
- :param default_version: if the version of the service can't be calculated, this optional
- default value is returned
- :return: the version that the service is upgrading from or None if there is no upgrade or
- the service is not included in the upgrade.
- """
- service_summary = _get_service_summary(service_group_name, service_name)
- if service_summary is None:
- return default_version
-
- return service_summary.source_version
-
-
-def get_target_version(service_group_name = None, service_name = None, default_version=None):
- """
- Gets the target (to) version of a service participating in an upgrade. If there is no
- upgrade or the specific service is not participating, this will return None.
- :param service_group_name: the service group name to check for, or None to extract it from the command
- :param service_name: the service name to check for, or None to extract it from the command
- :param default_version: if the version of the service can't be calculated, this optional
- default value is returned
- :return: the version that the service is upgrading to or None if there is no upgrade or
- the service is not included in the upgrade.
- """
- service_summary = _get_service_summary(service_group_name, service_name)
- if service_summary is None:
- return default_version
-
- return service_summary.target_version
-
-
-def get_downgrade_from_version(service_group_name = None, service_name = None):
- """
- Gets the downgrade-from-version for the specificed service. If there is no downgrade or
- the service isn't participating in the downgrade, then this will return None
- :param service_group_name: the service group, or optionally onmitted to infer it from the command.
- :param service_name: the service, or optionally onmitted to infer it from the command.
- :return: the downgrade-from-version or None
- """
- upgrade_summary = get_upgrade_summary()
- if upgrade_summary is None:
- return None
-
- if Direction.DOWNGRADE.lower() != upgrade_summary.direction.lower():
- return None
-
- service_summary = _get_service_summary(service_group_name, service_name)
- if service_summary is None:
- return None
+ def __init__(self):
+ from resource_management.libraries.script.script import Script
+
+ config = Script.get_config()
+ if "upgradeSummary" not in config or not config["upgradeSummary"]:
+ self.is_upgrade_in_progress = False
+ return
+
+ self.is_upgrade_in_progress = True
+ self.execution_command = Script.get_execution_command()
+ upgrade_summary = config["upgradeSummary"]
+
+ service_group_summary_dict = {}
+ for service_group_name, service_group_summary_json in upgrade_summary["serviceGroups"].iteritems():
+ service_summary_dict = {}
+
+ service_group_summary = UpgradeServiceGroupSummary(type = service_group_summary_json["type"],
+ service_group_id = service_group_summary_json["serviceGroupId"],
+ service_group_name = service_group_summary_json["serviceGroupName"],
+ source_mpack_id = service_group_summary_json["sourceMpackId"],
+ target_mpack_id = service_group_summary_json["targetMpackId"],
+ source_mpack_name = service_group_summary_json["sourceMpackName"],
+ target_mpack_name = service_group_summary_json["targetMpackName"],
+ source_mpack_version = service_group_summary_json["sourceMpackVersion"],
+ target_mpack_version = service_group_summary_json["targetMpackVersion"],
+ source_stack = service_group_summary_json["sourceStack"],
+ target_stack = service_group_summary_json["targetStack"],
+ services = service_summary_dict)
+
+ service_group_summary_dict[service_group_name] = service_group_summary
+
+ for service_name, service_summary_json in service_group_summary_json["services"].iteritems():
+ component_summary_dict = {}
+
+ service_summary = UpgradeServiceSummary(service_name = service_name,
+ source_version = service_summary_json["sourceVersion"],
+ target_version = service_summary_json["targetVersion"],
+ components = component_summary_dict)
+
+ service_summary_dict[service_name] = service_summary
+
+ for component_name, component_summary_json in service_summary_json["components"].iteritems():
+ component_summary = UpgradeComponentSummary(component_name = component_name,
+ source_version = component_summary_json["sourceVersion"],
+ target_version = component_summary_json["targetVersion"])
+
+ component_summary_dict[component_name] = component_summary
+
+ self.direction = upgrade_summary["direction"]
+ self.is_revert = upgrade_summary["isRevert"]
+ self.service_groups = service_group_summary_dict
+
+
+ def get_upgrade_type(self):
+ """
+ Gets the type of upgrade for the service group in the command.
+ :return: the type of upgrade or None
+ """
+ if not self.is_upgrade_in_progress:
+ return None
+
+ service_group_name = self.execution_command.get_servicegroup_name()
+
+ service_group_summary = self.get_service_group_summary(service_group_name)
+ if service_group_summary is None:
+ return None
+
+ if service_group_summary.type.lower() == "rolling_upgrade":
+ return UPGRADE_TYPE_ROLLING
+ elif service_group_summary.type.lower() == "express_upgrade":
+ return UPGRADE_TYPE_EXPRESS
+ elif service_group_summary.type.lower() == "host_ordered_upgrade":
+ return UPGRADE_TYPE_HOST_ORDERED
- return service_summary.source_version
-
-
-def _get_service_group_summary(service_group_name):
- """
- Gets the service group summary for the upgrade/downgrade for the given service group, or None if
- the service group isn't participating.
- :param service_group_name the service group name
- :return: the service group summary or None
- """
- upgrade_summary = get_upgrade_summary()
- if upgrade_summary is None:
return None
- if service_group_name is None:
- execution_command = Script.get_execution_command()
- service_group_name = execution_command.get_servicegroup_name()
-
- service_group_summary = upgrade_summary.service_groups
- if service_group_name not in service_group_summary:
- return None
-
- return service_group_summary[service_group_name]
-
-
-def _get_service_summary(service_group_name, service_name):
- """
- Gets the service summary for the upgrade/downgrade for the given service, or None if
- the service isn't participating.
- :param service_group_name the service group name
- :param service_name: the service name
- :return: the service summary or None
- """
- upgrade_summary = get_upgrade_summary()
- if upgrade_summary is None:
- return None
-
- execution_command = Script.get_execution_command()
-
- if service_group_name is None:
- service_group_name = execution_command.get_servicegroup_name()
-
- if service_name is None:
- service_name = execution_command.get_module_name()
-
- service_group_summary = _get_service_group_summary(service_group_name)
- if service_group_summary is None or service_name not in service_group_summary.services:
- return None
- return service_group_summary.services[service_name]
+ def get_downgrade_from_version(self, service_group_name = None, service_name = None):
+ """
+ Gets the downgrade-from-version for the specificed service. If there is no downgrade or
+ the service isn't participating in the downgrade, then this will return None
+ :param service_group_name: the service group, or optionally onmitted to infer it from the command.
+ :param service_name: the service, or optionally onmitted to infer it from the command.
+ :return: the downgrade-from-version or None
+ """
+ if Direction.DOWNGRADE.lower() != self.direction.lower():
+ return None
+
+ service_summary = self.get_service_summary(service_group_name, service_name)
+ if service_summary is None:
+ return None
+
+ return service_summary.source_version
+
+
+ def get_service_group_summary(self, service_group_name):
+ """
+ Gets the service group summary for the upgrade/downgrade for the given service group, or None if
+ the service group isn't participating.
+ :param service_group_name the service group name
+ :return: the service group summary or None
+ """
+ if service_group_name is None:
+ service_group_name = self.execution_command.get_servicegroup_name()
+
+ if service_group_name not in self.service_groups:
+ return None
+
+ return self.service_groups[service_group_name]
+
+
+ def get_service_summary(self, service_group_name, service_name):
+ """
+ Gets the service summary for the upgrade/downgrade for the given service, or None if
+ the service isn't participating.
+ :param service_group_name the service group name
+ :param service_name: the service name
+ :return: the service summary or None
+ """
+ if service_group_name is None:
+ service_group_name = self.execution_command.get_servicegroup_name()
+
+ if service_name is None:
+ service_name = self.execution_command.get_module_name()
+
+ service_group_summary = self.get_service_group_summary(service_group_name)
+ if service_group_summary is None or service_name not in service_group_summary.services:
+ return None
+
+ return service_group_summary.services[service_name]
+
+
+ def get_service_source_version(self, service_group_name, service_name, default_version = None):
+ """
+ Gets the source version of the service (aka the module) during an upgrade. This will not
+ return an mpack verison, but the specific module version instead.
+ :param service_group_name: the service group name
+ :param service_name: the service (module) name
+ :param default_version: the default version to return.
+ :return:
+ """
+ service_summary = self.get_service_summary(service_group_name, service_name)
+ if service_summary is None:
+ return default_version
+
+ return service_summary.source_version
+
+
+ def get_service_target_version(self, service_group_name, service_name, default_version = None):
+ """
+ Gets the target version of the service (aka the module) during an upgrade. This will not
+ return an mpack verison, but the specific module version instead.
+ :param service_group_name: the service group name
+ :param service_name: the service (module) name
+ :param default_version: the default version to return.
+ :return:
+ """
+ service_summary = self.get_service_summary(service_group_name, service_name)
+ if service_summary is None:
+ return default_version
+
+ return service_summary.target_version
\ No newline at end of file
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 31d5e18..5f52fea 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -25,16 +25,12 @@ import re
import os
import sys
import logging
-import inspect
import tarfile
import traceback
import time
from optparse import OptionParser
import resource_management
from ambari_commons import OSCheck
-from ambari_commons.constants import UPGRADE_TYPE_EXPRESS
-from ambari_commons.constants import UPGRADE_TYPE_ROLLING
-from ambari_commons.constants import UPGRADE_TYPE_HOST_ORDERED
from ambari_commons.network import reconfigure_urllib2_opener
from ambari_commons.inet_utils import ensure_ssl_using_protocol
from resource_management.libraries.resources import XmlConfig
@@ -46,8 +42,8 @@ from resource_management.core.environment import Environment
from resource_management.core.logger import Logger
from resource_management.core.exceptions import Fail, ClientComponentHasNoStatus, ComponentIsNotRunning
from resource_management.core.resources.packaging import Package
-from resource_management.libraries.functions.version import compare_versions
from resource_management.libraries.functions.version import format_stack_version
+from resource_management.libraries.functions.upgrade_summary import UpgradeSummary
from resource_management.libraries.functions import stack_tools
from resource_management.libraries.functions.constants import Direction
from resource_management.libraries.script.config_dictionary import ConfigDictionary, UnknownConfiguration
@@ -401,8 +397,15 @@ class Script(object):
def pre_start(self, env=None):
"""
- Executed before any start method. Posts contents of relevant *.out files to command execution log.
+ Executed before any start method, this will perform the following actions:
+ - Executes any pre-upgrade logic if there is an upgrade in progres.
+ - Posts contents of relevant *.out files to command execution log.
"""
+ # invoke pre-start upgrade function if in an upgrade
+ upgrade_summary = UpgradeSummary()
+ if Script.is_upgrade_in_progress():
+ self._update_mpack_instance_versions(env, upgrade_summary)
+
if self.log_out_files:
log_folder = self.get_log_folder()
user = self.get_user()
@@ -418,6 +421,11 @@ class Script(object):
show_logs(log_folder, user, lines_count=COUNT_OF_LAST_LINES_OF_OUT_FILES_LOGGED, mask=OUT_FILES_MASK)
def post_start(self, env=None):
+ # invoke post-start upgrade function if in an upgrade
+ upgrade_summary = UpgradeSummary()
+ if Script.is_upgrade_in_progress():
+ self.post_upgrade_restart(env, upgrade_summary)
+
pid_files = self.get_pid_files()
if pid_files == []:
Logger.logger.warning("Pid files for current script are not defined")
@@ -721,64 +729,6 @@ class Script(object):
return format_stack_version(stack_version_unformatted)
- @staticmethod
- def in_stack_upgrade():
- upgrade_direction = Script.execution_command.check_upgrade_direction()
- return upgrade_direction is not None and upgrade_direction in [Direction.UPGRADE, Direction.DOWNGRADE]
-
-
- @staticmethod
- def is_stack_greater(stack_version_formatted, compare_to_version):
- """
- Gets whether the provided stack_version_formatted (normalized)
- is greater than the specified stack version
- :param stack_version_formatted: the version of stack to compare
- :param compare_to_version: the version of stack to compare to
- :return: True if the command's stack is greater than the specified version
- """
- if stack_version_formatted is None or stack_version_formatted == "":
- return False
-
- return compare_versions(stack_version_formatted, compare_to_version) > 0
-
- @staticmethod
- def is_stack_greater_or_equal(compare_to_version):
- """
- Gets whether the hostLevelParams/stack_version, after being normalized,
- is greater than or equal to the specified stack version
- :param compare_to_version: the version to compare to
- :return: True if the command's stack is greater than or equal the specified version
- """
- return Script.is_stack_greater_or_equal_to(Script.get_stack_version(), compare_to_version)
-
- @staticmethod
- def is_stack_greater_or_equal_to(stack_version_formatted, compare_to_version):
- """
- Gets whether the provided stack_version_formatted (normalized)
- is greater than or equal to the specified stack version
- :param stack_version_formatted: the version of stack to compare
- :param compare_to_version: the version of stack to compare to
- :return: True if the command's stack is greater than or equal to the specified version
- """
- if stack_version_formatted is None or stack_version_formatted == "":
- return False
-
- return compare_versions(stack_version_formatted, compare_to_version) >= 0
-
- @staticmethod
- def is_stack_less_than(compare_to_version):
- """
- Gets whether the hostLevelParams/stack_version, after being normalized,
- is less than the specified stack version
- :param compare_to_version: the version to compare to
- :return: True if the command's stack is less than the specified version
- """
- stack_version_formatted = Script.get_stack_version()
-
- if stack_version_formatted is None:
- return False
-
- return compare_versions(stack_version_formatted, compare_to_version) < 0
def install(self, env):
"""
@@ -931,7 +881,39 @@ class Script(object):
"""
self.fail_with_error("stop method isn't implemented")
- def pre_upgrade_restart(self, env):
+ def _update_mpack_instance_versions(self, env, upgrade_summary):
+ """
+ Used to preform non-overridable actions, such as invoking the mpack instance manager to
+ switch versions, before restarting components during upgrade.
+
+ :param env:
+ :param upgrade_summary:
+ :return:
+ """
+ from resource_management.libraries.functions import mpack_manager_helper
+
+ execution_command = self.get_execution_command();
+ service_group_name = execution_command.get_servicegroup_name()
+ service_name = execution_command.get_module_name()
+ component_name = execution_command.get_component_type()
+
+ service_group_summary = upgrade_summary.get_service_group_summary(service_group_name)
+ if service_group_summary is None:
+ Logger.info("There is no upgrade information for the service group {0}, so it will be skipped".format(service_group_name))
+ return
+
+ target_mpack_name = service_group_summary.target_mpack_name
+ target_mpack_version = service_group_summary.target_mpack_version
+
+ Logger.info("Upgrading {0}'s {1}/{2} to {3}-{4}".format(service_group_name, service_name,
+ component_name, target_mpack_name, target_mpack_version))
+
+ mpack_manager_helper.set_mpack_instance(target_mpack_name, target_mpack_version, service_group_name,
+ module_name = service_name, components = [component_name])
+
+ self.pre_upgrade_restart(env, upgrade_summary)
+
+ def pre_upgrade_restart(self, env, upgrade_summary):
"""
To be overridden by subclasses
"""
@@ -956,53 +938,23 @@ class Script(object):
except KeyError:
pass
- upgrade_type_command_param = ""
- direction = None
- if config is not None:
- command_params = config["commandParams"] if "commandParams" in config else None
- if command_params is not None:
- upgrade_type_command_param = command_params["upgrade_type"] if "upgrade_type" in command_params else ""
- direction = command_params["upgrade_direction"] if "upgrade_direction" in command_params else None
-
- upgrade_type = Script.get_upgrade_type(upgrade_type_command_param)
- is_stack_upgrade = upgrade_type is not None
-
# need this before actually executing so that failures still report upgrade info
- if is_stack_upgrade:
- upgrade_info = {"upgrade_type": upgrade_type_command_param}
- if direction is not None:
- upgrade_info["direction"] = direction.upper()
-
- Script.structuredOut.update(upgrade_info)
+ if Script.is_upgrade_in_progress():
+ upgrade_summary = UpgradeSummary()
+ upgrade_dict = { "direction" : upgrade_summary.direction }
+ Script.structuredOut.update({"upgrade_summary": upgrade_dict})
if componentCategory and componentCategory.strip().lower() == 'CLIENT'.lower():
- if is_stack_upgrade:
- self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+ if Script.is_upgrade_in_progress():
+ self._pre_upgrade_restart(env)
self.install(env)
else:
- # To remain backward compatible with older stacks, only pass upgrade_type if available.
- # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
- if "upgrade_type" in inspect.getargspec(self.stop).args:
- self.stop(env, upgrade_type=upgrade_type)
- else:
- if is_stack_upgrade:
- self.stop(env, rolling_restart=(upgrade_type == UPGRADE_TYPE_ROLLING))
- else:
- self.stop(env)
-
- if is_stack_upgrade:
- # Remain backward compatible with the rest of the services that haven't switched to using
- # the pre_upgrade_restart method. Once done. remove the else-block.
- self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+ self.stop(env)
service_name = config['serviceName'] if config is not None and 'serviceName' in config else None
try:
- #TODO Once the logic for pid is available from Ranger and Ranger KMS code, will remove the below if block.
- services_to_skip = ['RANGER', 'RANGER_KMS']
- if service_name in services_to_skip:
- Logger.info('Temporarily skipping status check for {0} service only.'.format(service_name))
- elif is_stack_upgrade:
+ if Script.is_upgrade_in_progress():
Logger.info('Skipping status check for {0} service during upgrade'.format(service_name))
else:
self.status(env)
@@ -1012,23 +964,12 @@ class Script(object):
except ClientComponentHasNoStatus as e:
pass # expected
- # To remain backward compatible with older stacks, only pass upgrade_type if available.
- # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
self.pre_start(env)
- if "upgrade_type" in inspect.getargspec(self.start).args:
- self.start(env, upgrade_type=upgrade_type)
- else:
- if is_stack_upgrade:
- self.start(env, rolling_restart=(upgrade_type == UPGRADE_TYPE_ROLLING))
- else:
- self.start(env)
+ self.start(env)
self.post_start(env)
- if is_stack_upgrade:
- self.post_upgrade_restart(env, upgrade_type=upgrade_type)
-
- def post_upgrade_restart(self, env):
+ def post_upgrade_restart(self, env, upgrade_summary):
"""
To be overridden by subclasses
"""
@@ -1147,20 +1088,18 @@ class Script(object):
Script.instance = Script()
return Script.instance
- @staticmethod
- def get_upgrade_type(upgrade_type_command_param):
- upgrade_type = None
- if upgrade_type_command_param.lower() == "rolling_upgrade":
- upgrade_type = UPGRADE_TYPE_ROLLING
- elif upgrade_type_command_param.lower() == "express_upgrade":
- upgrade_type = UPGRADE_TYPE_EXPRESS
- elif upgrade_type_command_param.lower() == "host_ordered_upgrade":
- upgrade_type = UPGRADE_TYPE_HOST_ORDERED
-
- return upgrade_type
-
def __init__(self):
self.available_packages_in_repos = []
if Script.instance is not None:
raise Fail("An instantiation already exists! Use, get_instance() method.")
+
+
+ @staticmethod
+ def is_upgrade_in_progress():
+ """
+ Gets whether an upgrade is in progress
+ :return: True if an upgrade is in progress (or suspended), False otherwise
+ """
+ config = Script.get_config()
+ return "upgradeSummary" in config and config["upgradeSummary"] is not None
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 7bd3a91..e17d8fc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -67,12 +67,15 @@ import org.apache.ambari.server.events.UpgradeUpdateEvent;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.dao.MpackDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.ServiceGroupDAO;
import org.apache.ambari.server.orm.dao.UpgradeDAO;
import org.apache.ambari.server.orm.dao.UpgradePlanDAO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.MpackEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.ServiceGroupEntity;
import org.apache.ambari.server.orm.entities.UpgradeEntity;
import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
import org.apache.ambari.server.orm.entities.UpgradeHistoryEntity;
@@ -86,6 +89,7 @@ import org.apache.ambari.server.serveraction.kerberos.KerberosOperationException
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.Mpack.MpackChangeSummary;
import org.apache.ambari.server.state.ServiceGroup;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.UpgradeContext;
@@ -306,6 +310,20 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
@Inject
private static Provider<KerberosHelper> s_kerberosHelper;
+ /**
+ * Used for looking up {@link ServiceGroupEntity} instances when creating
+ * {@link UpgradeHistoryEntity}.
+ */
+ @Inject
+ private static ServiceGroupDAO s_serviceGroupDAO;
+
+ /**
+ * Used for looking up {@link MpackEntity} instances when creating
+ * {@link UpgradeHistoryEntity}.
+ */
+ @Inject
+ private static MpackDAO s_mpackDAO;
+
private static final Logger LOG = LoggerFactory.getLogger(UpgradeResourceProvider.class);
/**
@@ -1558,11 +1576,24 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
* @param upgradeContext
* the upgrade context for this upgrade (not {@code null}).
*/
- @Experimental(feature = ExperimentalFeature.MPACK_UPGRADES, comment = "Need to implement")
private void addComponentHistoryToUpgrade(Cluster cluster, UpgradeEntity upgrade,
UpgradeContext upgradeContext) throws AmbariException {
+
+ Map<ServiceGroup, MpackChangeSummary> serviceGroupsInUpgrade = upgradeContext.getServiceGroups();
+ for( ServiceGroup serviceGroup : serviceGroupsInUpgrade.keySet() ) {
+ MpackChangeSummary mpackChangeSummary = serviceGroupsInUpgrade.get(serviceGroup);
+ ServiceGroupEntity serviceGroupEntity = s_serviceGroupDAO.findByPK(serviceGroup.getServiceGroupId());
+ MpackEntity sourceMpackEntity = s_mpackDAO.findById(mpackChangeSummary.getSource().getResourceId());
+ MpackEntity targetMpackEntity = s_mpackDAO.findById(mpackChangeSummary.getTarget().getResourceId());
+
+ UpgradeHistoryEntity history = new UpgradeHistoryEntity(upgrade, serviceGroupEntity,
+ sourceMpackEntity, targetMpackEntity);
+
+ upgrade.addHistory(history);
+ }
}
+
/**
* Constructs an {@link ActionExecutionContext}, setting common parameters for
* all types of commands.
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java
index 5d1cf4e..3fffa30 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java
@@ -327,7 +327,7 @@ public class UpgradeContext {
// depending on the direction, we must either have a target repository or an upgrade we are downgrading from
switch(m_direction){
case UPGRADE:{
- m_type = calculateUpgradeType(upgradeRequestMap, null);
+ m_type = upgradePlan.getUpgradeType();
List<UpgradePlanDetailEntity> details = upgradePlan.getDetails();
for (UpgradePlanDetailEntity detail : details) {
@@ -640,9 +640,11 @@ public class UpgradeContext {
serviceGroupSummary.serviceGroupId = serviceGroup.getServiceGroupId();
serviceGroupSummary.serviceGroupName = serviceGroup.getServiceGroupName();
serviceGroupSummary.sourceMpackId = changeSummary.getSource().getResourceId();
+ serviceGroupSummary.sourceMpackName = changeSummary.getSource().getName();
serviceGroupSummary.sourceMpackVersion = changeSummary.getSource().getVersion();
serviceGroupSummary.sourceStack = changeSummary.getSource().getStackId().getStackId();
- serviceGroupSummary.targetMpackId = changeSummary.getTarget().getRegistryId();
+ serviceGroupSummary.targetMpackId = changeSummary.getTarget().getResourceId();
+ serviceGroupSummary.targetMpackName = changeSummary.getTarget().getName();
serviceGroupSummary.targetMpackVersion = changeSummary.getTarget().getVersion();
serviceGroupSummary.targetStack = changeSummary.getTarget().getStackId().getStackId();
serviceGroupSummary.services = new LinkedHashMap<>();
@@ -1232,6 +1234,12 @@ public class UpgradeContext {
@SerializedName("targetStack")
public String targetStack;
+ @SerializedName("sourceMpackName")
+ public String sourceMpackName;
+
+ @SerializedName("targetMpackName")
+ public String targetMpackName;
+
@SerializedName("sourceMpackVersion")
public String sourceMpackVersion;
diff --git a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/params.py b/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/params.py
index c1de556..0070935 100644
--- a/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/params.py
+++ b/ambari-server/src/main/resources/stack-hooks/before-ANY/scripts/params.py
@@ -29,7 +29,6 @@ from resource_management.libraries.functions import default
from resource_management.libraries.functions import format
from resource_management.libraries.functions.format_jvm_option import format_jvm_option_value
from resource_management.libraries.functions.is_empty import is_empty
-from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.functions.get_architecture import get_architecture
from resource_management.libraries.functions.cluster_settings import get_cluster_setting_value
from ambari_commons.constants import AMBARI_SUDO_BINARY
@@ -62,13 +61,7 @@ sudo = AMBARI_SUDO_BINARY
ambari_server_hostname = execution_command.get_ambari_server_host()
stack_version_unformatted = stack_settings.get_mpack_version()
-stack_version_formatted = stack_settings.get_mpack_version()
-upgrade_type = Script.get_upgrade_type(execution_command.get_upgrade_type())
-version = execution_command.get_new_mpack_version_for_upgrade()
-# Handle upgrade and downgrade
-if (upgrade_type is not None) and version:
- stack_version_formatted = format_stack_version(version)
"""
??? is this the same as ambariLevelParams/java_home and ambariLevelParams/java_name ???
"""
diff --git a/ambari-server/src/test/python/TestUpgradeSummary.py b/ambari-server/src/test/python/TestUpgradeSummary.py
index a88682f..0cd60cb 100644
--- a/ambari-server/src/test/python/TestUpgradeSummary.py
+++ b/ambari-server/src/test/python/TestUpgradeSummary.py
@@ -20,7 +20,7 @@ limitations under the License.
from resource_management.core.logger import Logger
-from resource_management.libraries.functions import upgrade_summary
+from resource_management.libraries.functions.upgrade_summary import UpgradeSummary
from resource_management.libraries.script import Script
from unittest import TestCase
@@ -36,21 +36,22 @@ class TestUpgradeSummary(TestCase):
command_json = TestUpgradeSummary._get_cluster_simple_upgrade_json()
Script.config = command_json
- summary = upgrade_summary.get_upgrade_summary()
+ summary = UpgradeSummary()
self.assertEqual(False, summary.is_revert)
self.assertEqual("UPGRADE", summary.direction)
service_groups = summary.service_groups
- self.assertEqual("express_upgrade", service_groups["SG1"].type)
+ service_group = summary.get_service_group_summary("SG1")
+ self.assertEqual("express_upgrade", service_group.type)
- services = service_groups["SG1"].services
+ services = service_group.services
self.assertEqual("3.0.0.0-b1", services["HDFS"].source_version)
self.assertEqual("3.1.0.0-b1", services["HDFS"].target_version)
- self.assertEqual("3.0.0.0-b1", upgrade_summary.get_source_version(service_group_name = "SG1", service_name = "HDFS"))
- self.assertEqual("3.1.0.0-b1", upgrade_summary.get_target_version(service_group_name = "SG1", service_name = "HDFS"))
+ self.assertEqual("3.0.0.0-b1", summary.get_service_summary(service_group_name = "SG1", service_name = "HDFS").source_version)
+ self.assertEqual("3.1.0.0-b1", summary.get_service_summary(service_group_name = "SG1", service_name = "HDFS").target_version)
- self.assertTrue(upgrade_summary.get_downgrade_from_version(service_group_name = "SG1", service_name="HDFS") is None)
+ self.assertTrue(summary.get_downgrade_from_version(service_group_name = "SG1", service_name="HDFS") is None)
def test_get_downgrade_from_version(self):
@@ -61,8 +62,10 @@ class TestUpgradeSummary(TestCase):
command_json = TestUpgradeSummary._get_cluster_simple_downgrade_json()
Script.config = command_json
- self.assertTrue(upgrade_summary.get_downgrade_from_version(service_group_name = "FOO", service_name = "BAR") is None)
- self.assertEqual("3.1.0.0-b1", upgrade_summary.get_downgrade_from_version(service_group_name = "SG1", service_name = "HDFS"))
+ summary = UpgradeSummary()
+
+ self.assertTrue(summary.get_downgrade_from_version(service_group_name = "FOO", service_name = "BAR") is None)
+ self.assertEqual("3.1.0.0-b1", summary.get_downgrade_from_version(service_group_name = "SG1", service_name = "HDFS"))
@staticmethod
@@ -81,6 +84,8 @@ class TestUpgradeSummary(TestCase):
"serviceGroupName": "SG1",
"sourceMpackId": 50,
"targetMpackId": 100,
+ "sourceMpackName": "HDPCORE",
+ "targetMpackName": "HDPCORE",
"sourceStack": "HDPCORE-1.0",
"targetStack": "HDPCORE-1.5",
"sourceMpackVersion": "1.0.0.0-b1",
@@ -122,6 +127,8 @@ class TestUpgradeSummary(TestCase):
"serviceGroupName": "SG1",
"sourceMpackId": 100,
"targetMpackId": 50,
+ "sourceMpackName": "HDPCORE",
+ "targetMpackName": "HDPCORE",
"sourceStack": "HDPCORE-1.5",
"targetStack": "HDPCORE-1.0",
"sourceMpackVersion": "1.5.0.0-b1",