You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/06/30 19:18:35 UTC
ambari git commit: AMMBARI-12205. RU - Misc issues: ZKFC not upgraded
on Standby NN; Flume kill needs signal;
Package Installation fails when host has no Stack components (alejandro)
Repository: ambari
Updated Branches:
refs/heads/trunk c7455ea8a -> 416f60063
AMMBARI-12205. RU - Misc issues: ZKFC not upgraded on Standby NN; Flume kill needs signal; Package Installation fails when host has no Stack components (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/416f6006
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/416f6006
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/416f6006
Branch: refs/heads/trunk
Commit: 416f60063c94eee9f00d19ad060ef7f082990bf0
Parents: c7455ea
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Mon Jun 29 17:06:16 2015 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Tue Jun 30 10:18:19 2015 -0700
----------------------------------------------------------------------
.../FLUME/1.4.0.2.0/package/scripts/flume.py | 2 +-
.../HDFS/2.1.0.2.0/package/scripts/namenode.py | 4 +-
.../HDFS/2.1.0.2.0/package/scripts/utils.py | 53 +++++++------
.../custom_actions/scripts/install_packages.py | 78 +++++++++++++-------
4 files changed, 82 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/416f6006/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
index 226cf8c..cff969f 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py
@@ -204,7 +204,7 @@ def flume(action = None):
if is_flume_process_live(pid_file):
pid = shell.checked_call(("cat", pid_file), sudo=True)[1].strip()
- Execute(('kill', pid), sudo=True)
+ Execute(("kill", "-15", pid), sudo=True) # kill command has to be a tuple
if not await_flume_process_termination(pid_file):
raise Fail("Can't stop flume agent: {0}".format(agent))
http://git-wip-us.apache.org/repos/asf/ambari/blob/416f6006/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 87c38ae..b15bdc4 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -46,7 +46,7 @@ import namenode_upgrade
from hdfs_namenode import namenode
from hdfs import hdfs
import hdfs_rebalance
-from utils import failover_namenode
+from utils import stop_zkfc_during_ru
# hashlib is supplied as of Python 2.5 as the replacement interface for md5
@@ -86,7 +86,7 @@ class NameNode(Script):
env.set_params(params)
if rolling_restart and params.dfs_ha_enabled:
if params.dfs_ha_automatic_failover_enabled:
- failover_namenode()
+ stop_zkfc_during_ru()
else:
raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
namenode(action="stop", rolling_restart=rolling_restart, env=env)
http://git-wip-us.apache.org/repos/asf/ambari/blob/416f6006/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
index 2360d7a..d9180d8 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
@@ -30,6 +30,7 @@ from resource_management.core.shell import as_user, as_sudo
from resource_management.core.exceptions import ComponentIsNotRunning
from resource_management.core.logger import Logger
from resource_management.libraries.functions.curl_krb_request import curl_krb_request
+from resource_management.core.exceptions import Fail
from zkfc_slave import ZkfcSlave
@@ -60,23 +61,25 @@ def safe_zkfc_op(action, env):
zkfc.stop(env)
-def failover_namenode():
+def stop_zkfc_during_ru():
"""
- Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
+ Restart ZKFC on either the standby or active Namenode. If done on the currently active namenode, wait for it to
+ become the standby.
"""
import params
check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
- state = "unknown"
+ original_state = "unknown"
if code == 0 and out:
- state = "active" if "active" in out else ("standby" if "standby" in out else state)
- Logger.info("Namenode service state: %s" % state)
+ original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state)
+ Logger.info("Namenode service state: %s" % original_state)
- if state == "active":
- Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
+ msg = "Rolling Upgrade - Killing ZKFC on {0} NameNode host {1} {2}"\
+ .format(original_state, params.hostname, "to initiate a failover" if original_state == "active" else "")
+ Logger.info(msg)
- # Forcefully kill ZKFC on this host to initiate a failover
+ # Forcefully kill ZKFC. If this is the active, will initiate a failover.
# If ZKFC is already dead, then potentially this node can still be the active one.
was_zkfc_killed = kill_zkfc(params.hdfs_user)
@@ -84,22 +87,24 @@ def failover_namenode():
check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
# process may already be down. try one time, then proceed
- code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
- Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
- if code == 255 and out:
- Logger.info("Rolling Upgrade - namenode is already down.")
- else:
- if was_zkfc_killed:
- # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
- Logger.info("Waiting for this NameNode to become the standby one.")
- Execute(check_standby_cmd,
- user=params.hdfs_user,
- tries=50,
- try_sleep=6,
- logoutput=True)
+ if original_state == "active":
+ code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
+ Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
+
+ if code == 255 and out:
+ Logger.info("Rolling Upgrade - namenode is already down.")
+ else:
+ if was_zkfc_killed:
+ # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
+ Logger.info("Waiting for this NameNode to become the standby one.")
+ Execute(check_standby_cmd,
+ user=params.hdfs_user,
+ tries=50,
+ try_sleep=6,
+ logoutput=True)
else:
- Logger.info("Rolling Upgrade - Host %s is already the standby namenode." % str(params.hostname))
+ raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd))
def kill_zkfc(zkfc_user):
@@ -117,8 +122,8 @@ def kill_zkfc(zkfc_user):
check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
code, out = shell.call(check_process)
if code == 0:
- Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
- kill_command = format("kill -9 `cat {zkfc_pid_file}`")
+ Logger.debug("ZKFC is running and will be killed.")
+ kill_command = format("kill -15 `cat {zkfc_pid_file}`")
Execute(kill_command,
user=zkfc_user
)
http://git-wip-us.apache.org/repos/asf/ambari/blob/416f6006/ambari-server/src/main/resources/custom_actions/scripts/install_packages.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/scripts/install_packages.py b/ambari-server/src/main/resources/custom_actions/scripts/install_packages.py
index 9c1b3d7..b028852 100644
--- a/ambari-server/src/main/resources/custom_actions/scripts/install_packages.py
+++ b/ambari-server/src/main/resources/custom_actions/scripts/install_packages.py
@@ -48,6 +48,7 @@ class InstallPackages(Script):
UBUNTU_REPO_COMPONENTS_POSTFIX = ["main"]
REPO_FILE_NAME_PREFIX = 'HDP-'
+ STACK_TO_ROOT_FOLDER = {"HDP": "/usr/hdp"}
# Mapping file used to store repository versions without a build number, and the actual version it corresponded to.
# E.g., HDP 2.2.0.0 => HDP 2.2.0.0-2041
@@ -80,6 +81,17 @@ class InstallPackages(Script):
package_list = json.loads(config['commandParams']['package_list'])
stack_id = config['commandParams']['stack_id']
+ stack_name = None
+ self.stack_root_folder = None
+ if stack_id and "-" in stack_id:
+ stack_split = stack_id.split("-")
+ if len(stack_split) == 2:
+ stack_name = stack_split[0].upper()
+ if stack_name in self.STACK_TO_ROOT_FOLDER:
+ self.stack_root_folder = self.STACK_TO_ROOT_FOLDER[stack_name]
+ if self.stack_root_folder is None:
+ raise Fail("Cannot determine the stack's root directory by parsing the stack_id property, {0}".format(str(stack_id)))
+
self.repository_version = self.repository_version.strip()
# Install/update repositories
@@ -120,14 +132,11 @@ class InstallPackages(Script):
m = re.search("[\d\.]+-\d+", self.repository_version)
if m:
# Contains a build number
- self.structured_output['actual_version'] = self.repository_version
+ self.structured_output['actual_version'] = self.repository_version # This is the best value known so far.
self.put_structured_out(self.structured_output)
# Initial list of versions, used to compute the new version installed
- self.old_versions = []
- if self.actual_version is None:
- Logger.info("Calculate the actual version.".format(self.repository_version))
- self.old_versions = self.hdp_versions()
+ self.old_versions = self.hdp_versions()
try:
# It's possible for the process to receive a SIGTERM while installing the packages
@@ -207,34 +216,43 @@ class InstallPackages(Script):
"""
After packages are installed, determine what the new actual version is, in order to save it.
"""
+ Logger.info("Attempting to determine actual version with build number.")
+ Logger.info("Old versions: {0}".format(self.old_versions))
- # If needed to calculate the actual_version, add it to the structured out file.
- if self.actual_version is None:
- Logger.info("Attempting to determine actual version with build number.")
- Logger.info("Old versions: {0}".format(self.old_versions))
-
- new_versions = self.hdp_versions()
- Logger.info("New versions: {0}".format(new_versions))
+ new_versions = self.hdp_versions()
+ Logger.info("New versions: {0}".format(new_versions))
- deltas = set(new_versions) - set(self.old_versions)
- Logger.info("Deltas: {0}".format(deltas))
+ deltas = set(new_versions) - set(self.old_versions)
+ Logger.info("Deltas: {0}".format(deltas))
- if 1 == len(deltas):
- self.actual_version = next(iter(deltas)).strip()
+ if 1 == len(deltas):
+ self.actual_version = next(iter(deltas)).strip()
+ self.structured_output['actual_version'] = self.actual_version
+ self.put_structured_out(self.structured_output)
+ self.write_actual_version_to_file(self.actual_version)
+ else:
+ Logger.info("Cannot determine a new actual version installed by using the delta method.")
+ # If the first install attempt does a partial install and is unable to report this to the server,
+ # then a subsequent attempt will report an empty delta. For this reason, it is important to search the
+ # repo version history file to determine if we previously did write an actual_version.
+ self.actual_version = self.get_actual_version_from_file()
+ if self.actual_version is not None:
+ self.actual_version = self.actual_version.strip()
self.structured_output['actual_version'] = self.actual_version
self.put_structured_out(self.structured_output)
- self.write_actual_version_to_file(self.actual_version)
+ Logger.info("Found actual version {0} by parsing file {1}".format(self.actual_version, self.REPO_VERSION_HISTORY_FILE))
else:
- Logger.info("Cannot determine a new actual version installed by using the delta method. "
- "This is expected during the first install attempt since not all packages will yield a new version in \"hdp-select versions\".")
- # If the first install attempt does a partial install and is unable to report this to the server,
- # then a subsequent attempt will report an empty delta. For this reason, it is important to search the
- # repo version history file to determine if we previously did write an actual_version.
- self.actual_version = self.get_actual_version_from_file()
- if self.actual_version is not None:
- self.actual_version = self.actual_version.strip()
- self.structured_output['actual_version'] = self.actual_version
- self.put_structured_out(self.structured_output)
+ # It's likely that this host does not have any Stack Components installed, so only contains AMS.
+ if not os.path.exists(self.stack_root_folder):
+ # Special case when this host does not contain any HDP components, but still contains other components like AMS.
+ msg = "Could not determine actual version. This stack's root directory ({0}) is not present on this host, so this host does not contain any versionable components. " \
+ "Therefore, ignore this host and allow other hosts to report the correct repository version.".format(self.stack_root_folder)
+ Logger.info(msg)
+ else:
+ msg = "Could not determine actual version. This stack's root directory ({0}) exists but was not able to determine the actual repository version installed. " \
+ "Try reinstalling packages again.".format(self.stack_root_folder)
+ raise Fail(msg)
+
def install_packages(self, package_list):
"""
@@ -277,7 +295,11 @@ class InstallPackages(Script):
Package(package, action="remove")
else:
# Compute the actual version in order to save it in structured out
- self.compute_actual_version()
+ try:
+ self.compute_actual_version()
+ except Exception, err:
+ ret_code = 1
+ Logger.logger.exception("Failure while computing actual version. Error: {0}".format(str(err)))
pass
return ret_code