You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2015/10/15 00:43:40 UTC

ambari git commit: AMBARI-13396: RU: Handle Namenode being down scenarios (jluniya)

Repository: ambari
Updated Branches:
  refs/heads/trunk e019c6c15 -> c00908495


AMBARI-13396: RU: Handle Namenode being down scenarios (jluniya)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c0090849
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c0090849
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c0090849

Branch: refs/heads/trunk
Commit: c00908495953e7c725bd49b7a124883d12621324
Parents: e019c6c
Author: Jayush Luniya <jl...@hortonworks.com>
Authored: Wed Oct 14 15:43:34 2015 -0700
Committer: Jayush Luniya <jl...@hortonworks.com>
Committed: Wed Oct 14 15:43:34 2015 -0700

----------------------------------------------------------------------
 .../2.1.0.2.0/package/scripts/params_linux.py   |   2 +
 .../HDFS/2.1.0.2.0/package/scripts/utils.py     | 117 +++++++++----------
 2 files changed, 58 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c0090849/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
index 563c234..38eac2e 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
@@ -55,6 +55,8 @@ security_enabled = config['configurations']['cluster-env']['security_enabled']
 hdfs_user = status_params.hdfs_user
 root_user = "root"
 hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix
+namenode_pid_file = status_params.namenode_pid_file
+zkfc_pid_file = status_params.zkfc_pid_file
 
 # Some datanode settings
 dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0090849/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
index 7bc0b6a..d1e764a 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
@@ -31,6 +31,7 @@ from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.logger import Logger
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states
 
 from zkfc_slave import ZkfcSlave
 
@@ -74,50 +75,59 @@ def initiate_safe_zkfc_failover():
     Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
             user = params.hdfs_user)
 
-  check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
-  code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
-
-  original_state = "unknown"
-  if code == 0 and out:
-    original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state)
-    Logger.info("Namenode service state: %s" % original_state)
-
-    if original_state == "active":
-      msg = "Rolling Upgrade - Initiating a ZKFC failover on {0} NameNode host {1}.".format(original_state, params.hostname)
-      Logger.info(msg)
-
-      check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
-      failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
-
-      code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
-      Logger.info(format("Rolling Upgrade - failover command returned {code}"))
-      wait_for_standby = False
-
-      if code == 0:
-        wait_for_standby = True
+  active_namenode_id = None
+  standby_namenode_id = None
+  active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states(params.hdfs_site, params.security_enabled, params.hdfs_user)
+  if active_namenodes:
+    active_namenode_id = active_namenodes[0][0]
+  if standby_namenodes:
+    standby_namenode_id = standby_namenodes[0][0]
+
+  if active_namenode_id:
+    Logger.info(format("Active NameNode id: {active_namenode_id}"))
+  if standby_namenode_id:
+    Logger.info(format("Standby NameNode id: {standby_namenode_id}"))
+  if unknown_namenodes:
+    for unknown_namenode in unknown_namenodes:
+      Logger.info("NameNode HA state for {0} is unknown".format(unknown_namenode[0]))
+
+  if params.namenode_id == active_namenode_id and params.other_namenode_id == standby_namenode_id:
+    # Failover if this NameNode is active and other NameNode is up and in standby (i.e. ready to become active on failover)
+    Logger.info(format("NameNode {namenode_id} is active and NameNode {other_namenode_id} is in standby"))
+
+    failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
+    check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
+
+    msg = "Rolling Upgrade - Initiating a ZKFC failover on active NameNode host {0}.".format(params.hostname)
+    Logger.info(msg)
+    code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
+    Logger.info(format("Rolling Upgrade - failover command returned {code}"))
+    wait_for_standby = False
+
+    if code == 0:
+      wait_for_standby = True
+    else:
+      # Try to kill ZKFC manually
+      was_zkfc_killed = kill_zkfc(params.hdfs_user)
+      code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
+      Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
+      if code == 255 and out:
+        Logger.info("Rolling Upgrade - NameNode is already down.")
       else:
-        # Try to kill ZKFC manually
-        was_zkfc_killed = kill_zkfc(params.hdfs_user)
-        code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
-        Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
-
-        if code == 255 and out:
-          Logger.info("Rolling Upgrade - namenode is already down.")
-        else:
-          if was_zkfc_killed:
-            # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
-            wait_for_standby = True
-
-      if wait_for_standby:
-        Logger.info("Waiting for this NameNode to become the standby one.")
-        Execute(check_standby_cmd,
-                user=params.hdfs_user,
-                tries=50,
-                try_sleep=6,
-                logoutput=True)
+        if was_zkfc_killed:
+          # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
+          wait_for_standby = True
+
+    if wait_for_standby:
+      Logger.info("Waiting for this NameNode to become the standby one.")
+      Execute(check_standby_cmd,
+              user=params.hdfs_user,
+              tries=50,
+              try_sleep=6,
+              logoutput=True)
   else:
-    raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd))
-
+    msg = "Rolling Upgrade - Skipping ZKFC failover on NameNode host {0}.".format(params.hostname)
+    Logger.info(msg)
 
 def kill_zkfc(zkfc_user):
   """
@@ -129,36 +139,21 @@ def kill_zkfc(zkfc_user):
   """
   import params
   if params.dfs_ha_enabled:
-    zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
-    if zkfc_pid_file:
+    if params.zkfc_pid_file:
       check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
       code, out = shell.call(check_process)
       if code == 0:
         Logger.debug("ZKFC is running and will be killed.")
         kill_command = format("kill -15 `cat {zkfc_pid_file}`")
         Execute(kill_command,
-             user=zkfc_user
+                user=zkfc_user
         )
-        File(zkfc_pid_file,
+        File(params.zkfc_pid_file,
              action = "delete",
-        )
+             )
         return True
   return False
 
-
-def get_service_pid_file(name, user):
-  """
-  Get the pid file path that was used to start the service by the user.
-  :param name: Service name
-  :param user: User that started the service.
-  :return: PID file path
-  """
-  import params
-  pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
-  pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
-  return pid_file
-
-
 def service(action=None, name=None, user=None, options="", create_pid_dir=False,
             create_log_dir=False):
   """