You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/10/22 04:24:29 UTC

[1/2] ambari git commit: AMBARI-12701. Stop-and-Start Upgrade: Handle Core Services (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/trunk 34a03533b -> 7afe5a4ec


http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
index 4cbce34..5017d39 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
@@ -48,9 +48,15 @@ upgrade_direction = default("/commandParams/upgrade_direction", None)
 stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
 hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)
 
-# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
+# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade
 version = default("/commandParams/version", None)
 
+# The desired role is only available during a Non-Rolling Upgrade in HA.
+# The server calculates which of the two NameNodes will be the active, and the other the standby since they
+# are started using different commands.
+desired_namenode_role = default("/commandParams/desired_namenode_role", None)
+
+
 security_enabled = config['configurations']['cluster-env']['security_enabled']
 hdfs_user = status_params.hdfs_user
 root_user = "root"

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/setup_ranger_hdfs.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/setup_ranger_hdfs.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/setup_ranger_hdfs.py
index f5df86f..6a64b2f 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/setup_ranger_hdfs.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/setup_ranger_hdfs.py
@@ -19,7 +19,7 @@ limitations under the License.
 """
 from resource_management.core.logger import Logger
 
-def setup_ranger_hdfs(rolling_upgrade = False):
+def setup_ranger_hdfs(upgrade_type=None):
   import params
 
   if params.has_ranger_admin:
@@ -31,7 +31,7 @@ def setup_ranger_hdfs(rolling_upgrade = False):
 
     hdp_version = None
 
-    if rolling_upgrade:
+    if upgrade_type is not None:
       hdp_version = params.version
 
     setup_ranger_plugin('hadoop-client', 'hdfs',

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py
index 108ef01..f67b1cb 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py
@@ -38,13 +38,13 @@ class SNameNode(Script):
     hdfs("secondarynamenode")
     snamenode(action="configure")
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
     snamenode(action="start")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     snamenode(action="stop")
@@ -60,9 +60,7 @@ class SNameNodeDefault(SNameNode):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-hdfs-secondarynamenode"}
 
-  def pre_rolling_restart(self, env):
-    # Secondary namenode is actually removed in an HA cluster, which is a pre-requisite for Rolling Upgrade,
-    # so it does not need any Rolling Restart logic.
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     pass
 
   def security_status(self, env):

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
index d1e764a..97ad424 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py
@@ -32,6 +32,7 @@ from resource_management.core.logger import Logger
 from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from resource_management.core.exceptions import Fail
 from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states
+from resource_management.libraries.script.script import Script
 
 from zkfc_slave import ZkfcSlave
 
@@ -336,3 +337,32 @@ def is_secure_port(port):
     return port < 1024
   else:
     return False
+
+def is_previous_fs_image():
+  """
+  Return true if there's a previous folder in the HDFS namenode directories.
+  """
+  import params
+  if params.dfs_name_dir:
+    nn_name_dirs = params.dfs_name_dir.split(',')
+    for nn_dir in nn_name_dirs:
+      prev_dir = os.path.join(nn_dir, "previous")
+      if os.path.isdir(prev_dir):
+        return True
+  return False
+
+def get_hdfs_binary(distro_component_name):
+  """
+  Get the hdfs binary to use depending on the stack and version.
+  :param distro_component_name: e.g., hadoop-hdfs-namenode, hadoop-hdfs-datanode
+  :return: The hdfs binary to use
+  """
+  import params
+  hdfs_binary = "hdfs"
+  if params.stack_name == "HDP":
+    # This was used in HDP 2.1 and earlier
+    hdfs_binary = "hdfs"
+    if Script.is_hdp_stack_greater_or_equal("2.2"):
+      hdfs_binary = "/usr/hdp/current/{0}/bin/hdfs".format(distro_component_name)
+
+  return hdfs_binary
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
index 14de094..e9037d8 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
@@ -42,7 +42,7 @@ class ZkfcSlave(Script):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ZkfcSlaveDefault(ZkfcSlave):
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
@@ -68,7 +68,7 @@ class ZkfcSlaveDefault(ZkfcSlave):
       create_log_dir=True
     )
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/application_timeline_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/application_timeline_server.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/application_timeline_server.py
index f3b3d11..7644225 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/application_timeline_server.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/application_timeline_server.py
@@ -37,13 +37,13 @@ class ApplicationTimelineServer(Script):
   def install(self, env):
     self.install_packages(env)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env) # FOR SECURITY
     service('timelineserver', action='start')
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     service('timelineserver', action='stop')
@@ -65,8 +65,8 @@ class ApplicationTimelineServerDefault(ApplicationTimelineServer):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-yarn-timelineserver"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py
index 229dcd8..5d95c5c 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/historyserver.py
@@ -44,7 +44,7 @@ class HistoryServer(Script):
   def install(self, env):
     self.install_packages(env)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     service('historyserver', action='stop', serviceName='mapreduce')
@@ -72,8 +72,8 @@ class HistoryServerDefault(HistoryServer):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-mapreduce-historyserver"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -86,7 +86,7 @@ class HistoryServerDefault(HistoryServer):
       copy_to_hdfs("slider", params.user_group, params.hdfs_user, host_sys_prepped=params.host_sys_prepped)
       params.HdfsResource(None, action="execute")
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env) # FOR SECURITY

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/mapreduce2_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/mapreduce2_client.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/mapreduce2_client.py
index 5263d9f..7ceadf0 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/mapreduce2_client.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/mapreduce2_client.py
@@ -52,7 +52,7 @@ class MapReduce2ClientDefault(MapReduce2Client):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-client"}
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/nodemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/nodemanager.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/nodemanager.py
index fd25651..d508d55 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/nodemanager.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/nodemanager.py
@@ -39,12 +39,12 @@ class Nodemanager(Script):
   def install(self, env):
     self.install_packages(env)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     service('nodemanager',action='stop')
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env) # FOR SECURITY
@@ -67,8 +67,8 @@ class NodemanagerDefault(Nodemanager):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-yarn-nodemanager"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing NodeManager Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing NodeManager Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -76,8 +76,8 @@ class NodemanagerDefault(Nodemanager):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-yarn-nodemanager", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing NodeManager Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing NodeManager Stack Upgrade post-restart")
     import params
     env.set_params(params)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index ba65fbc..929269d 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -57,7 +57,7 @@ stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
 hdp_stack_version_major = format_hdp_stack_version(stack_version_unformatted)
 hdp_stack_version = functions.get_hdp_version('hadoop-yarn-resourcemanager')
 
-# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade.
+# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade.
 # It cannot be used during the initial Cluser Install because the version is not yet known.
 version = default("/commandParams/version", None)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
index e67f1ce..ec7799e 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
@@ -47,7 +47,7 @@ class Resourcemanager(Script):
   def install(self, env):
     self.install_packages(env)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     service('resourcemanager', action='stop')
@@ -97,8 +97,8 @@ class ResourcemanagerDefault(Resourcemanager):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-yarn-resourcemanager"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
 
@@ -106,7 +106,7 @@ class ResourcemanagerDefault(Resourcemanager):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-yarn-resourcemanager", params.version)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/yarn_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/yarn_client.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/yarn_client.py
index e58ea3c..0c6115f 100644
--- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/yarn_client.py
+++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/yarn_client.py
@@ -52,7 +52,7 @@ class YarnClientDefault(YarnClient):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-client"}
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper.py
index 973fa0f..ce5545f 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper.py
@@ -28,13 +28,13 @@ from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def zookeeper(type = None, rolling_restart = False):
+def zookeeper(type = None, upgrade_type=None):
   import params
 
   if type == 'server':
     # This path may be missing after Ambari upgrade. We need to create it. We need to do this before any configs will
     # be applied.
-    if not rolling_restart and not os.path.exists("/usr/hdp/current/zookeeper-server") and params.current_version:
+    if upgrade_type is None and not os.path.exists("/usr/hdp/current/zookeeper-server") and params.current_version:
       conf_select.select(params.stack_name, "zookeeper", params.current_version)
       hdp_select.select("zookeeper-server", params.version)
 
@@ -108,7 +108,7 @@ def zookeeper(type = None, rolling_restart = False):
   )
 
 @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
-def zookeeper(type = None, rolling_restart = False):
+def zookeeper(type = None, upgrade_type=None):
   import params
   configFile("zoo.cfg", template_name="zoo.cfg.j2", mode="f")
   configFile("configuration.xsl", template_name="configuration.xsl.j2", mode="f")

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py
index 02c1006..7a11fee 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_client.py
@@ -37,13 +37,13 @@ class ZookeeperClient(Script):
     zookeeper(type='client')
     pass
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
     pass
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     pass
@@ -60,8 +60,8 @@ class ZookeeperClientLinux(ZookeeperClient):
     self.install_packages(env)
     self.configure(env)
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
index b7fb578..842deb0 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
@@ -43,21 +43,21 @@ from ambari_commons.os_family_impl import OsFamilyImpl
 
 class ZookeeperServer(Script):
 
-  def configure(self, env, rolling_restart=False):
+  def configure(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    zookeeper(type='server', rolling_restart=rolling_restart)
+    zookeeper(type='server', upgrade_type=upgrade_type)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    self.configure(env, rolling_restart=rolling_restart)
-    zookeeper_service(action='start', rolling_restart=rolling_restart)
+    self.configure(env, upgrade_type=upgrade_type)
+    zookeeper_service(action='start', upgrade_type=upgrade_type)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    zookeeper_service(action='stop', rolling_restart=rolling_restart)
+    zookeeper_service(action='stop', upgrade_type=upgrade_type)
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ZookeeperServerLinux(ZookeeperServer):
@@ -69,8 +69,8 @@ class ZookeeperServerLinux(ZookeeperServer):
     self.install_packages(env)
     self.configure(env)
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -78,8 +78,11 @@ class ZookeeperServerLinux(ZookeeperServer):
       conf_select.select(params.stack_name, "zookeeper", params.version)
       hdp_select.select("zookeeper-server", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    if upgrade_type == "nonrolling":
+      return
+
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
     zk_server_host = random.choice(params.zookeeper_hosts)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_service.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_service.py
index 685eb6d..14cd85c 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_service.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_service.py
@@ -27,11 +27,11 @@ from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import hdp_select
 
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def zookeeper_service(action='start', rolling_restart=False):
+def zookeeper_service(action='start', upgrade_type=None):
   import params
 
   # This path may be missing after Ambari upgrade. We need to create it.
-  if not rolling_restart and not os.path.exists("/usr/hdp/current/zookeeper-server") and params.current_version:
+  if upgrade_type is None and not os.path.exists("/usr/hdp/current/zookeeper-server") and params.current_version:
     conf_select.select(params.stack_name, "zookeeper", params.current_version)
     hdp_select.select("zookeeper-server", params.version)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
index daa1e08..c81b1ea 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
@@ -92,7 +92,7 @@
       <skippable>true</skippable>
       <execute-stage service="OOZIE" component="OOZIE_SERVER" title="Backup Oozie Database">
         <task xsi:type="manual">
-          <message>Before continuing, please backup the Oozie Server database on {{oozie-env/oozie_hostname}}.</message>
+          <message>Before continuing, please backup the Oozie Server database on {{hosts.all}}.</message>
         </task>
       </execute-stage>
 
@@ -178,6 +178,16 @@
       </execute-stage>
     </group>
 
+    <group xsi:type="cluster" name="Upgrade service configs" title="Upgrade service configs">
+      <direction>UPGRADE</direction>   <!--  prevent config changes on downgrade -->
+      <skippable>false</skippable>
+
+      <!--YARN-->
+      <execute-stage service="MAPREDUCE2" component="HISTORYSERVER">
+        <task xsi:type="configure" id="hdp_2_2_0_0_historyserver_classpath"/>
+      </execute-stage>
+    </group>
+
     <!-- After processing this group, will change the effective Stack of the UpgradeContext object. -->
     <group xsi:type="update-stack" name="UPDATE_DESIRED_STACK_ID" title="Update Desired Stack Id">
       <execute-stage title="Update Desired Stack Id" service="" component="">
@@ -221,6 +231,19 @@
       </service>
     </group>
 
+    <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <direction>UPGRADE</direction>
+
+      <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
+        <task xsi:type="execute" hosts="master" summary="Wait for NameNode to leave Safemode">
+          <script>scripts/namenode.py</script>
+          <function>wait_for_safemode_off</function>
+        </task>
+      </execute-stage>
+    </group>
+
     <group xsi:type="restart" name="MR and YARN" title="MR and YARN">
       <service-check>false</service-check>
       <skippable>true</skippable>
@@ -266,10 +289,10 @@
       <skippable>true</skippable>
       <direction>UPGRADE</direction>
       <priority>
-        <service>HBASE</service>
-        <service>MAPREDUCE2</service>
-        <service>YARN</service>
         <service>HDFS</service>
+        <service>YARN</service>
+        <service>MAPREDUCE2</service>
+        <service>HBASE</service>
       </priority>
     </group>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml
index 9c96dfb..7f8faf0 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/config-upgrade.xml
@@ -19,6 +19,17 @@
 <upgrade-config-changes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 
   <services>
+    <service name="YARN">
+      <component name="HISTORY_SERVER">
+        <changes>
+          <definition xsi:type="configure" id="hdp_2_2_0_0_historyserver_classpath" summary="YARN Application Classpath">
+            <type>yarn-site</type>
+            <set key="yarn.application.classpath" value="$HADOOP_CONF_DIR,/usr/hdp/current/hadoop-client/*,/usr/hdp/current/hadoop-client/lib/*,/usr/hdp/current/hadoop-hdfs-client/*,/usr/hdp/current/hadoop-hdfs-client/lib/*,/usr/hdp/current/hadoop-yarn-client/*,/usr/hdp/current/hadoop-yarn-client/lib/*"/>
+          </definition>
+        </changes>
+      </component>
+    </service>
+
     <service name="HIVE">
       <component name="HIVE_SERVER">
         <changes>

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
index 8fbb963..5aacfa0 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml
@@ -271,6 +271,19 @@
       </service>
     </group>
 
+    <group xsi:type="cluster" name="HDFS_LEAFE_SAFEMODE" title="HDFS - Wait to leave Safemode">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <direction>UPGRADE</direction>
+
+      <execute-stage service="HDFS" component="NAMENODE" title="Wait to leave Safemode">
+        <task xsi:type="execute" hosts="master" summary="Wait for NameNode to leave Safemode">
+          <script>scripts/namenode.py</script>
+          <function>wait_for_safemode_off</function>
+        </task>
+      </execute-stage>
+    </group>
+
     <group xsi:type="restart" name="MR and YARN" title="MR and YARN">
       <service-check>false</service-check>
       <skippable>true</skippable>

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java
index bac00d4..94a5336 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.state.stack.upgrade;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.ambari.server.serveraction.upgrades.AutoSkipFailedSummaryAction;
 import org.apache.ambari.server.stack.HostsType;
@@ -125,7 +126,7 @@ public class StageWrapperBuilderTest {
      */
     @Override
     public void add(UpgradeContext upgradeContext, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
index fd66502..263eeb2 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_datanode.py
@@ -450,7 +450,7 @@ class TestDatanode(RMFTestCase):
                               )
 
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -458,7 +458,7 @@ class TestDatanode(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -468,7 +468,7 @@ class TestDatanode(RMFTestCase):
 
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -478,7 +478,7 @@ class TestDatanode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -499,7 +499,7 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart(self, time_mock):
+  def test_post_upgrade_restart(self, time_mock):
     shell_call_output = """
       Live datanodes (2):
 
@@ -523,7 +523,7 @@ class TestDatanode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                        classname = "DataNode",
-                       command = "post_rolling_restart",
+                       command = "post_upgrade_restart",
                        config_file = "default.json",
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -537,12 +537,12 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart_datanode_not_ready(self, time_mock):
+  def test_post_upgrade_restart_datanode_not_ready(self, time_mock):
     mocks_dict = {}
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                          classname = "DataNode",
-                         command = "post_rolling_restart",
+                         command = "post_upgrade_restart",
                          config_file = "default.json",
                          hdp_stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -556,12 +556,12 @@ class TestDatanode(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart_bad_returncode(self, time_mock):
+  def test_post_upgrade_restart_bad_returncode(self, time_mock):
     try:
       mocks_dict = {}
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/datanode.py",
                          classname = "DataNode",
-                         command = "post_rolling_restart",
+                         command = "post_upgrade_restart",
                          config_file = "default.json",
                          hdp_stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -594,7 +594,7 @@ class TestDatanode(RMFTestCase):
         hdp_stack_version = self.STACK_VERSION,
         target = RMFTestCase.TARGET_COMMON_SERVICES,
         call_mocks = call_mock_side_effects,
-        command_args=[True])
+        command_args=["rolling"])
 
       raise Fail("Expected a fail since datanode didn't report a shutdown")
     except Exception, err:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py
index 9d93128..055f291 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py
@@ -187,7 +187,7 @@ class Test(RMFTestCase):
 
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -197,7 +197,7 @@ class Test(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hdfs_client.py",
                        classname = "HdfsClient",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -216,7 +216,7 @@ class Test(RMFTestCase):
       ('ambari-python-wrap', '/usr/bin/conf-select', 'create-conf-dir', '--package', 'hadoop', '--stack-version', '2.3.0.0-1234', '--conf-version', '0'),
        mocks_dict['call'].call_args_list[0][0][0])
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -224,7 +224,7 @@ class Test(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hdfs_client.py",
                        classname = "HdfsClient",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
index d333071..a6cd740 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_journalnode.py
@@ -260,7 +260,7 @@ class TestJournalnode(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart(self, time_mock):
+  def test_post_upgrade_restart(self, time_mock):
     # load the NN and JN JMX files so that the urllib2.urlopen mock has data
     # to return
     num_journalnodes = 3
@@ -295,7 +295,7 @@ class TestJournalnode(RMFTestCase):
       with patch.object(urllib2, "urlopen", urlopen_mock):
        with patch.object(NamenodeHAState, "get_address", get_address_mock):
          self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/journalnode.py",
-           classname = "JournalNode", command = "post_rolling_restart",
+           classname = "JournalNode", command = "post_upgrade_restart",
            config_file = "journalnode-upgrade.json",
            checked_call_mocks = [(0, str(namenode_status_active)), (0, str(namenode_status_standby))],
            hdp_stack_version = self.UPGRADE_STACK_VERSION,
@@ -314,7 +314,7 @@ class TestJournalnode(RMFTestCase):
       with patch.object(urllib2, "urlopen", urlopen_mock):
         with patch.object(NamenodeHAState, "get_address", get_address_mock):
          self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/journalnode.py",
-           classname = "JournalNode", command = "post_rolling_restart",
+           classname = "JournalNode", command = "post_upgrade_restart",
            config_file = "journalnode-upgrade-hdfs-secure.json",
            checked_call_mocks = [(0, str(namenode_status_active)), (0, str(namenode_status_standby))],
            hdp_stack_version = self.UPGRADE_STACK_VERSION,
@@ -328,7 +328,7 @@ class TestJournalnode(RMFTestCase):
 
   @patch('time.sleep')
   @patch("urllib2.urlopen")
-  def test_post_rolling_restart_bad_jmx(self, urlopen_mock, time_mock):
+  def test_post_upgrade_restart_bad_jmx(self, urlopen_mock, time_mock):
     urlopen_mock_response = '{ "bad_data" : "gonna_mess_you_up" }'
 
     url_stream_mock = MagicMock()
@@ -337,7 +337,7 @@ class TestJournalnode(RMFTestCase):
 
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/journalnode.py",
-        classname = "JournalNode", command = "post_rolling_restart",
+        classname = "JournalNode", command = "post_upgrade_restart",
         config_file = "journalnode-upgrade.json",
         hdp_stack_version = self.UPGRADE_STACK_VERSION,
         target = RMFTestCase.TARGET_COMMON_SERVICES )
@@ -460,7 +460,7 @@ class TestJournalnode(RMFTestCase):
     put_structured_out_mock.assert_called_with({"securityState": "UNSECURED"})
 
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -468,7 +468,7 @@ class TestJournalnode(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/journalnode.py",
                        classname = "JournalNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -476,7 +476,7 @@ class TestJournalnode(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -486,7 +486,7 @@ class TestJournalnode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/journalnode.py",
                        classname = "JournalNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
index e954a84..afa404c 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_namenode.py
@@ -1302,7 +1302,7 @@ class TestNamenode(RMFTestCase):
                      hdp_stack_version = self.STACK_VERSION,
                      target = RMFTestCase.TARGET_COMMON_SERVICES)
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -1310,7 +1310,7 @@ class TestNamenode(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -1319,7 +1319,7 @@ class TestNamenode(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -1329,7 +1329,7 @@ class TestNamenode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -1347,13 +1347,13 @@ class TestNamenode(RMFTestCase):
       ('ambari-python-wrap', '/usr/bin/conf-select', 'create-conf-dir', '--package', 'hadoop', '--stack-version', '2.3.0.0-1234', '--conf-version', '0'),
        mocks_dict['call'].call_args_list[0][0][0])
 
-  def test_post_rolling_restart(self):
+  def test_post_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "post_rolling_restart",
+                       command = "post_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -1439,7 +1439,7 @@ class TestNamenode(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch.object(shell, "call")
-  def test_pre_rolling_restart_21_and_lower_params(self, call_mock):
+  def test_pre_upgrade_restart_21_and_lower_params(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/nn_ru_lzo.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -1449,7 +1449,7 @@ class TestNamenode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -1462,7 +1462,7 @@ class TestNamenode(RMFTestCase):
     self.assertEquals("/usr/lib/hadoop/sbin", sys.modules["params"].hadoop_bin)
 
   @patch.object(shell, "call")
-  def test_pre_rolling_restart_22_params(self, call_mock):
+  def test_pre_upgrade_restart_22_params(self, call_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/nn_ru_lzo.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -1474,7 +1474,7 @@ class TestNamenode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -1487,7 +1487,7 @@ class TestNamenode(RMFTestCase):
     self.assertEquals("/usr/hdp/current/hadoop-client/sbin", sys.modules["params"].hadoop_bin)
 
   @patch.object(shell, "call")
-  def test_pre_rolling_restart_23_params(self, call_mock):
+  def test_pre_upgrade_restart_23_params(self, call_mock):
     import itertools
 
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/nn_ru_lzo.json"
@@ -1502,7 +1502,7 @@ class TestNamenode(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/namenode.py",
                        classname = "NameNode",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_nfsgateway.py b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_nfsgateway.py
index ee85e4a..5852eaf 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_nfsgateway.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/HDFS/test_nfsgateway.py
@@ -384,7 +384,7 @@ class TestNFSGateway(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart(self, call_mock):
+  def test_pre_upgrade_restart(self, call_mock):
     call_mock.side_effects = [(0, None), (0, None)]
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
@@ -395,7 +395,7 @@ class TestNFSGateway(RMFTestCase):
     json_content['hostLevelParams']['stack_version'] = stack_version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/nfsgateway.py",
                        classname = "NFSGateway",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/YARN/test_historyserver.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_historyserver.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_historyserver.py
index 1321aaa..40a085f 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_historyserver.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_historyserver.py
@@ -767,7 +767,7 @@ class TestHistoryServer(RMFTestCase):
   @patch.object(Script, "is_hdp_stack_greater_or_equal", new = MagicMock(return_value="2.3.0"))
   @patch.object(functions, "get_hdp_version", new = MagicMock(return_value="2.3.0.0-1234"))
   @patch("resource_management.libraries.functions.copy_tarball.copy_to_hdfs")
-  def test_pre_rolling_restart_23(self, copy_to_hdfs_mock):
+  def test_pre_upgrade_restart_23(self, copy_to_hdfs_mock):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -778,7 +778,7 @@ class TestHistoryServer(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/historyserver.py",
                        classname = "HistoryServer",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/YARN/test_mapreduce2_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_mapreduce2_client.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_mapreduce2_client.py
index 532ce36..dd20b79 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_mapreduce2_client.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_mapreduce2_client.py
@@ -387,7 +387,7 @@ class TestMapReduce2Client(RMFTestCase):
     self.assertResourceCalled("Execute", ('ambari-python-wrap', '/usr/bin/hdp-select', 'set', 'hadoop-client', '2.2.1.0-2067'), sudo=True)
 
 
-  def test_pre_rolling_restart_23(self):
+  def test_pre_upgrade_restart_23(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/client-upgrade.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -397,7 +397,7 @@ class TestMapReduce2Client(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/mapreduce2_client.py",
                        classname = "MapReduce2Client",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
index e7da747..3ccde3b 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_nodemanager.py
@@ -520,7 +520,7 @@ class TestNodeManager(RMFTestCase):
 
   @patch('time.sleep')
   @patch.object(resource_management.libraries.functions, "get_hdp_version", new = MagicMock(return_value='2.3.0.0-1234'))
-  def test_post_rolling_restart(self, time_mock):
+  def test_post_upgrade_restart(self, time_mock):
     process_output = """
       c6401.ambari.apache.org:45454  RUNNING  c6401.ambari.apache.org:8042  0
     """
@@ -528,7 +528,7 @@ class TestNodeManager(RMFTestCase):
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/nodemanager.py",
       classname = "Nodemanager",
-      command = "post_rolling_restart",
+      command = "post_upgrade_restart",
       config_file = "default.json",
       hdp_stack_version = self.STACK_VERSION,
       target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -547,7 +547,7 @@ class TestNodeManager(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart_nodemanager_not_ready(self, time_mock):
+  def test_post_upgrade_restart_nodemanager_not_ready(self, time_mock):
     process_output = """
       c9999.ambari.apache.org:45454  RUNNING  c9999.ambari.apache.org:8042  0
     """
@@ -556,7 +556,7 @@ class TestNodeManager(RMFTestCase):
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/nodemanager.py",
                          classname="Nodemanager",
-                         command = "post_rolling_restart",
+                         command = "post_upgrade_restart",
                          config_file="default.json",
                          hdp_stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -570,7 +570,7 @@ class TestNodeManager(RMFTestCase):
 
 
   @patch('time.sleep')
-  def test_post_rolling_restart_nodemanager_not_ready(self, time_mock):
+  def test_post_upgrade_restart_nodemanager_not_ready(self, time_mock):
     process_output = """
       c6401.ambari.apache.org:45454  RUNNING  c6401.ambari.apache.org:8042  0
     """
@@ -579,7 +579,7 @@ class TestNodeManager(RMFTestCase):
     try:
       self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/nodemanager.py",
                          classname="Nodemanager",
-                         command = "post_rolling_restart",
+                         command = "post_upgrade_restart",
                          config_file="default.json",
                          hdp_stack_version = self.STACK_VERSION,
                          target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -701,7 +701,7 @@ class TestNodeManager(RMFTestCase):
 
   
   @patch.object(resource_management.libraries.functions, "get_hdp_version", new = MagicMock(return_value='2.3.0.0-1234'))
-  def test_pre_rolling_restart_23(self):
+  def test_pre_upgrade_restart_23(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -711,7 +711,7 @@ class TestNodeManager(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/nodemanager.py",
                        classname = "Nodemanager",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/YARN/test_resourcemanager.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_resourcemanager.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_resourcemanager.py
index a965c90..4639bd4 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_resourcemanager.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_resourcemanager.py
@@ -623,7 +623,7 @@ class TestResourceManager(RMFTestCase):
     )
     put_structured_out_mock.assert_called_with({"securityState": "UNSECURED"})
 
-  def test_pre_rolling_restart_23(self):
+  def test_pre_upgrade_restart_23(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -633,7 +633,7 @@ class TestResourceManager(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/resourcemanager.py",
                        classname = "Resourcemanager",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/YARN/test_yarn_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_yarn_client.py b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_yarn_client.py
index 413b2ad..78043f6 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/YARN/test_yarn_client.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/YARN/test_yarn_client.py
@@ -556,7 +556,7 @@ class TestYarnClient(RMFTestCase):
     # for now, it's enough that hdp-select is confirmed
 
   @patch.object(functions, "get_hdp_version", new = MagicMock(return_value='2.3.0.0-1234'))
-  def test_pre_rolling_restart_23(self):
+  def test_pre_upgrade_restart_23(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -566,7 +566,7 @@ class TestYarnClient(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/yarn_client.py",
                        classname = "YarnClient",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_client.py b/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_client.py
index 7a624bd..7a6d225 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_client.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_client.py
@@ -154,7 +154,7 @@ class TestZookeeperClient(RMFTestCase):
     self.assertNoMoreResources()
 
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -162,7 +162,7 @@ class TestZookeeperClient(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/zookeeper_client.py",
                        classname = "ZookeeperClient",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -171,7 +171,7 @@ class TestZookeeperClient(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     call_mock.side_effects = [(0, None), (0, None)]
 
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
@@ -183,7 +183,7 @@ class TestZookeeperClient(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/zookeeper_client.py",
                        classname = "ZookeeperClient",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py b/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py
index a6d610f..8949eaa 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/ZOOKEEPER/test_zookeeper_server.py
@@ -340,7 +340,7 @@ class TestZookeeperServer(RMFTestCase):
     put_structured_out_mock.assert_called_with({"securityState": "UNSECURED"})
 
 
-  def test_pre_rolling_restart(self):
+  def test_pre_upgrade_restart(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -348,7 +348,7 @@ class TestZookeeperServer(RMFTestCase):
     json_content['commandParams']['version'] = version
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/zookeeper_server.py",
                        classname = "ZookeeperServer",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
@@ -357,7 +357,7 @@ class TestZookeeperServer(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.core.shell.call")
-  def test_pre_rolling_restart_23(self, call_mock):
+  def test_pre_upgrade_restart_23(self, call_mock):
     call_mock.side_effects = [(0, None), (0, None)]
 
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
@@ -369,7 +369,7 @@ class TestZookeeperServer(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/zookeeper_server.py",
                        classname = "ZookeeperServer",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,
@@ -391,7 +391,7 @@ class TestZookeeperServer(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch.object(resource_management.libraries.functions, "get_unique_id_and_date")
-  def test_post_rolling_restart(self, get_unique_id_and_date_mock):
+  def test_post_upgrade_restart(self, get_unique_id_and_date_mock):
     unique_value = "unique1"
     get_unique_id_and_date_mock.return_value = unique_value
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
@@ -403,7 +403,7 @@ class TestZookeeperServer(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/zookeeper_server.py",
                        classname = "ZookeeperServer",
-                       command = "post_rolling_restart",
+                       command = "post_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/test/python/stacks/2.1/YARN/test_apptimelineserver.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/YARN/test_apptimelineserver.py b/ambari-server/src/test/python/stacks/2.1/YARN/test_apptimelineserver.py
index 6f3ea6d..0e467d8 100644
--- a/ambari-server/src/test/python/stacks/2.1/YARN/test_apptimelineserver.py
+++ b/ambari-server/src/test/python/stacks/2.1/YARN/test_apptimelineserver.py
@@ -375,7 +375,7 @@ class TestAppTimelineServer(RMFTestCase):
     put_structured_out_mock.assert_called_with({"securityState": "UNSECURED"})
 
   @patch.object(resource_management.libraries.functions, "get_hdp_version", new = MagicMock(return_value='2.3.0.0-1234'))
-  def test_pre_rolling_restart_23(self):
+  def test_pre_upgrade_restart_23(self):
     config_file = self.get_src_folder()+"/test/python/stacks/2.0.6/configs/default.json"
     with open(config_file, "r") as f:
       json_content = json.load(f)
@@ -385,7 +385,7 @@ class TestAppTimelineServer(RMFTestCase):
     mocks_dict = {}
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/application_timeline_server.py",
                        classname = "ApplicationTimelineServer",
-                       command = "pre_rolling_restart",
+                       command = "pre_upgrade_restart",
                        config_dict = json_content,
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES,


[2/2] ambari git commit: AMBARI-12701. Stop-and-Start Upgrade: Handle Core Services (alejandro)

Posted by al...@apache.org.
AMBARI-12701. Stop-and-Start Upgrade: Handle Core Services (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7afe5a4e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7afe5a4e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7afe5a4e

Branch: refs/heads/trunk
Commit: 7afe5a4ec9200c41922fb1c0809f34a0d8d700bd
Parents: 34a0353
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Wed Oct 21 17:48:24 2015 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Wed Oct 21 19:22:52 2015 -0700

----------------------------------------------------------------------
 .../libraries/script/script.py                  |  68 +++++---
 .../PreUpgradeCheckResourceProvider.java        |  12 +-
 .../internal/UpgradeResourceProvider.java       |  50 +++++-
 .../ambari/server/stack/MasterHostResolver.java |  23 +++
 .../ambari/server/state/UpgradeHelper.java      |  76 +++++++--
 .../state/stack/upgrade/ClusterGrouping.java    |   5 +-
 .../state/stack/upgrade/ColocatedGrouping.java  |   8 +-
 .../server/state/stack/upgrade/Grouping.java    |  15 +-
 .../stack/upgrade/ServiceCheckGrouping.java     |   2 +-
 .../state/stack/upgrade/StageWrapper.java       |  40 ++++-
 .../stack/upgrade/StageWrapperBuilder.java      |   5 +-
 .../server/state/stack/upgrade/TaskWrapper.java |  27 ++-
 .../state/stack/upgrade/TaskWrapperBuilder.java |  10 +-
 .../HDFS/2.1.0.2.0/package/scripts/datanode.py  |  40 +++--
 .../package/scripts/datanode_upgrade.py         |  27 +--
 .../2.1.0.2.0/package/scripts/hdfs_client.py    |   6 +-
 .../2.1.0.2.0/package/scripts/hdfs_namenode.py  | 124 +++++++++-----
 .../2.1.0.2.0/package/scripts/journalnode.py    |  15 +-
 .../package/scripts/journalnode_upgrade.py      |   2 +-
 .../HDFS/2.1.0.2.0/package/scripts/namenode.py  | 115 ++++++++++---
 .../package/scripts/namenode_ha_state.py        |  27 +++
 .../package/scripts/namenode_upgrade.py         | 166 +++++++++++++++++--
 .../2.1.0.2.0/package/scripts/nfsgateway.py     |   6 +-
 .../2.1.0.2.0/package/scripts/params_linux.py   |   8 +-
 .../package/scripts/setup_ranger_hdfs.py        |   4 +-
 .../HDFS/2.1.0.2.0/package/scripts/snamenode.py |   8 +-
 .../HDFS/2.1.0.2.0/package/scripts/utils.py     |  30 ++++
 .../2.1.0.2.0/package/scripts/zkfc_slave.py     |   4 +-
 .../scripts/application_timeline_server.py      |   8 +-
 .../2.1.0.2.0/package/scripts/historyserver.py  |   8 +-
 .../package/scripts/mapreduce2_client.py        |   2 +-
 .../2.1.0.2.0/package/scripts/nodemanager.py    |  12 +-
 .../2.1.0.2.0/package/scripts/params_linux.py   |   2 +-
 .../package/scripts/resourcemanager.py          |   8 +-
 .../2.1.0.2.0/package/scripts/yarn_client.py    |   2 +-
 .../3.4.5.2.0/package/scripts/zookeeper.py      |   6 +-
 .../package/scripts/zookeeper_client.py         |   8 +-
 .../package/scripts/zookeeper_server.py         |  25 +--
 .../package/scripts/zookeeper_service.py        |   4 +-
 .../HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml |  31 +++-
 .../stacks/HDP/2.2/upgrades/config-upgrade.xml  |  11 ++
 .../HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml |  13 ++
 .../stack/upgrade/StageWrapperBuilderTest.java  |   3 +-
 .../python/stacks/2.0.6/HDFS/test_datanode.py   |  22 +--
 .../stacks/2.0.6/HDFS/test_hdfs_client.py       |   8 +-
 .../stacks/2.0.6/HDFS/test_journalnode.py       |  18 +-
 .../python/stacks/2.0.6/HDFS/test_namenode.py   |  24 +--
 .../python/stacks/2.0.6/HDFS/test_nfsgateway.py |   4 +-
 .../stacks/2.0.6/YARN/test_historyserver.py     |   4 +-
 .../stacks/2.0.6/YARN/test_mapreduce2_client.py |   4 +-
 .../stacks/2.0.6/YARN/test_nodemanager.py       |  16 +-
 .../stacks/2.0.6/YARN/test_resourcemanager.py   |   4 +-
 .../stacks/2.0.6/YARN/test_yarn_client.py       |   4 +-
 .../2.0.6/ZOOKEEPER/test_zookeeper_client.py    |   8 +-
 .../2.0.6/ZOOKEEPER/test_zookeeper_server.py    |  12 +-
 .../stacks/2.1/YARN/test_apptimelineserver.py   |   4 +-
 56 files changed, 894 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index e647c11..e3bae5d 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -26,6 +26,7 @@ import os
 import sys
 import logging
 import platform
+import inspect
 import tarfile
 from ambari_commons import OSCheck, OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
@@ -427,18 +428,19 @@ class Script(object):
     sys.exit(1)
 
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """
     self.fail_with_error("start method isn't implemented")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """
     self.fail_with_error("stop method isn't implemented")
 
+  # TODO, remove after all services have switched to pre_upgrade_restart
   def pre_rolling_restart(self, env):
     """
     To be overridden by subclasses
@@ -463,45 +465,67 @@ class Script(object):
       command_params = config["commandParams"] if "commandParams" in config else None
       if command_params is not None:
         restart_type = command_params["restart_type"] if "restart_type" in command_params else ""
-        if restart_type:
-          restart_type = restart_type.encode('ascii', 'ignore')
 
-    rolling_restart = restart_type.lower().startswith("rolling")
+    upgrade_type = None
+    if restart_type.lower() == "rolling_upgrade":
+      upgrade_type = "rolling"
+    elif restart_type.lower() == "nonrolling_upgrade":
+      upgrade_type = "nonrolling"
+
+    is_stack_upgrade = upgrade_type is not None
 
     if componentCategory and componentCategory.strip().lower() == 'CLIENT'.lower():
-      if rolling_restart:
-        self.pre_rolling_restart(env)
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the pre_upgrade_restart method. Once done. remove the else-block.
+        if "pre_upgrade_restart" in dir(self):
+          self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.pre_rolling_restart(env)
 
       self.install(env)
     else:
-      # To remain backward compatible with older stacks, only pass rolling_restart if True.
-      if rolling_restart:
-        self.stop(env, rolling_restart=rolling_restart)
+      # To remain backward compatible with older stacks, only pass upgrade_type if available.
+      # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
+      if is_stack_upgrade and "upgrade_type" in inspect.getargspec(self.stop).args:
+        self.stop(env, upgrade_type=upgrade_type)
       else:
-        self.stop(env)
-
-      if rolling_restart:
-        self.pre_rolling_restart(env)
-
-      # To remain backward compatible with older stacks, only pass rolling_restart if True.
-      if rolling_restart:
-        self.start(env, rolling_restart=rolling_restart)
+        self.stop(env, rolling_restart=(restart_type == "rolling_upgrade"))
+
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the pre_upgrade_restart method. Once done. remove the else-block.
+        if "pre_upgrade_restart" in dir(self):
+          self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.pre_rolling_restart(env)
+
+      # To remain backward compatible with older stacks, only pass upgrade_type if available.
+      # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
+      if is_stack_upgrade and "upgrade_type" in inspect.getargspec(self.start).args:
+        self.start(env, upgrade_type=upgrade_type)
       else:
-        self.start(env)
+        self.start(env, rolling_restart=(restart_type == "rolling_upgrade"))
 
-      if rolling_restart:
-        self.post_rolling_restart(env)
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the post_upgrade_restart method. Once done. remove the else-block.
+        if "post_upgrade_restart" in dir(self):
+          self.post_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.post_rolling_restart(env)
 
     if self.should_expose_component_version("restart"):
       self.save_component_version_to_structured_out()
 
+  # TODO, remove after all services have switched to post_upgrade_restart
   def post_rolling_restart(self, env):
     """
     To be overridden by subclasses
     """
     pass
 
-  def configure(self, env, rolling_restart=False):
+  def configure(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
index a139446..e98f730 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
@@ -134,8 +134,16 @@ public class PreUpgradeCheckResourceProvider extends ReadOnlyResourceProvider {
 
     for (Map<String, Object> propertyMap: propertyMaps) {
       final String clusterName = propertyMap.get(UPGRADE_CHECK_CLUSTER_NAME_PROPERTY_ID).toString();
-      final UpgradeType upgradeType = propertyMap.containsKey(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID) ?
-          UpgradeType.valueOf(propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID).toString()) : UpgradeType.ROLLING;
+
+      UpgradeType upgradeType = UpgradeType.ROLLING;
+      if (propertyMap.containsKey(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID)) {
+        try {
+          upgradeType = UpgradeType.valueOf(propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID).toString());
+        } catch(Exception e){
+          throw new SystemException(String.format("Property %s has an incorrect value of %s.", UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID, propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID)));
+        }
+      }
+
       final Cluster cluster;
 
       try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 82ce49f..78c36f8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -492,8 +492,14 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     String preferredUpgradePackName = (String) requestMap.get(UPGRADE_PACK);
 
     // Default to ROLLING upgrade, but attempt to read from properties.
-    final UpgradeType upgradeType = requestMap.containsKey(UPGRADE_TYPE) ?
-        UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString()) : UpgradeType.ROLLING;
+    UpgradeType upgradeType = UpgradeType.ROLLING;
+    if (requestMap.containsKey(UPGRADE_TYPE)) {
+      try {
+        upgradeType = UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString());
+      } catch(Exception e){
+        throw new AmbariException(String.format("Property %s has an incorrect value of %s.", UPGRADE_TYPE, requestMap.get(UPGRADE_TYPE)));
+      }
+    }
 
     if (null == clusterName) {
       throw new AmbariException(String.format("%s is required", UPGRADE_CLUSTER_NAME));
@@ -526,8 +532,15 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     boolean skipPrereqChecks = Boolean.parseBoolean((String) requestMap.get(UPGRADE_SKIP_PREREQUISITE_CHECKS));
     boolean failOnCheckWarnings = Boolean.parseBoolean((String) requestMap.get(UPGRADE_FAIL_ON_CHECK_WARNINGS));
     String preferredUpgradePack = requestMap.containsKey(UPGRADE_PACK) ? (String) requestMap.get(UPGRADE_PACK) : null;
-    UpgradeType upgradeType = requestMap.containsKey(UPGRADE_TYPE) ?
-        UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString()) : UpgradeType.ROLLING;
+
+    UpgradeType upgradeType = UpgradeType.ROLLING;
+    if (requestMap.containsKey(UPGRADE_TYPE)) {
+      try {
+        upgradeType = UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString());
+      } catch(Exception e){
+        throw new AmbariException(String.format("Property %s has an incorrect value of %s.", UPGRADE_TYPE, requestMap.get(UPGRADE_TYPE)));
+      }
+    }
 
     // Validate there isn't an direction == upgrade/downgrade already in progress.
     List<UpgradeEntity> upgrades = s_upgradeDAO.findUpgrades(cluster.getClusterId());
@@ -1061,6 +1074,23 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     }
   }
 
+  /**
+   * Modify the commandParams by applying additional parameters from the stage.
+   * @param wrapper Stage Wrapper that may contain additional parameters.
+   * @param commandParams Parameters to modify.
+   */
+  private void applyAdditionalParameters(StageWrapper wrapper, Map<String, String> commandParams) {
+    if (wrapper.getParams() != null) {
+      Iterator it = wrapper.getParams().entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<String, String> pair = (Map.Entry) it.next();
+        if (!commandParams.containsKey(pair.getKey())) {
+          commandParams.put(pair.getKey(), pair.getValue());
+        }
+      }
+    }
+  }
+
   private void makeActionStage(UpgradeContext context, RequestStageContainer request,
       UpgradeItemEntity entity, StageWrapper wrapper, boolean skippable, boolean allowRetry)
           throws AmbariException {
@@ -1084,6 +1114,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     params.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     params.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, params);
+
     // Because custom task may end up calling a script/function inside a
     // service, it is necessary to set the
     // service_package_folder and hooks_folder params.
@@ -1192,6 +1225,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, commandParams);
+
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         function, filters, commandParams);
     actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
@@ -1244,6 +1280,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, commandParams);
+
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         "SERVICE_CHECK", filters, commandParams);
 
@@ -1304,6 +1343,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Notice that this does not apply any params because the input does not specify a stage.
+    // All of the other actions do use additional params.
+
     String itemDetail = entity.getText();
     String stageText = StringUtils.abbreviate(entity.getText(), 255);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
index 55fb12b..22447d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
@@ -222,6 +222,29 @@ public class MasterHostResolver {
     }
   }
 
+  /**
+   * Determine if HDFS is present and it has NameNode High Availability.
+   * @return true if has NameNode HA, otherwise, false.
+   */
+  public boolean isNameNodeHA() throws AmbariException {
+    Map<String, org.apache.ambari.server.state.Service> services = m_cluster.getServices();
+    if (services != null && services.containsKey("HDFS")) {
+
+      Set<String> secondaryNameNodeHosts = m_cluster.getHosts("HDFS", "SECONDARY_NAMENODE");
+      Set<String> nameNodeHosts = m_cluster.getHosts("HDFS", "NAMENODE");
+
+      if (secondaryNameNodeHosts.size() == 1 && nameNodeHosts.size() == 1) {
+        return false;
+      }
+      if (nameNodeHosts.size() > 1) {
+        return true;
+      }
+
+      throw new AmbariException("Unable to determine if cluster has NameNode HA.");
+    }
+    return false;
+  }
+
 
   /**
    * Get mapping of the HDFS Namenodes from the state ("active" or "standby") to the hostname.

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
index b81d3fd..fd92d21 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.state;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -325,12 +326,15 @@ public class UpgradeHelper {
         }
 
         for (String component : service.components) {
+          // Rolling Upgrade has exactly one task for a Component.
           if (upgradePack.getType() == UpgradeType.ROLLING && !allTasks.get(service.serviceName).containsKey(component)) {
             continue;
           }
+
+          // NonRolling Upgrade has several tasks for the same component, since it must first call Stop, perform several
+          // other tasks, and then Start on that Component.
           
           HostsType hostsType = mhr.getMasterAndHosts(service.serviceName, component);
-          // TODO AMBARI-12698, how does this impact SECONDARY NAMENODE if there's no NameNode HA?
           if (null == hostsType) {
             continue;
           }
@@ -368,26 +372,66 @@ public class UpgradeHelper {
 
           setDisplayNames(context, service.serviceName, component);
 
-          // Special case for NAMENODE
+          // Special case for NAMENODE when there are multiple
           if (service.serviceName.equalsIgnoreCase("HDFS") && component.equalsIgnoreCase("NAMENODE")) {
-            // !!! revisit if needed
-            if (!hostsType.hosts.isEmpty() && hostsType.master != null && hostsType.secondary != null) {
-              // The order is important, first do the standby, then the active namenode.
-              LinkedHashSet<String> order = new LinkedHashSet<>();
-
-              order.add(hostsType.secondary);
-              order.add(hostsType.master);
 
-              // Override the hosts with the ordered collection
-              hostsType.hosts = order;
+            // Rolling Upgrade requires first upgrading the Standby, then the Active NameNode.
+            // Whereas NonRolling needs to do the following:
+            //   NameNode HA:  Pick one to the be active, and the other the standby.
+            //   Non-NameNode HA: Upgrade first the SECONDARY, then the primary NAMENODE
+            switch (upgradePack.getType()) {
+              case ROLLING:
+                if (!hostsType.hosts.isEmpty() && hostsType.master != null && hostsType.secondary != null) {
+                  // The order is important, first do the standby, then the active namenode.
+                  LinkedHashSet<String> order = new LinkedHashSet<String>();
+
+                  order.add(hostsType.secondary);
+                  order.add(hostsType.master);
+
+                  // Override the hosts with the ordered collection
+                  hostsType.hosts = order;
+
+                  builder.add(context, hostsType, service.serviceName,
+                      svc.isClientOnlyService(), pc, null);
+                }
+                break;
+              case NON_ROLLING:
+                boolean isNameNodeHA = mhr.isNameNodeHA();
+                if (isNameNodeHA && hostsType.master != null && hostsType.secondary != null) {
+                  // This could be any order, but the NameNodes have to know what role they are going to take.
+                  // So need to make 2 stages, and add different parameters to each one.
+
+                  HostsType ht1 = new HostsType();
+                  LinkedHashSet<String> h1Hosts = new LinkedHashSet<String>();
+                  h1Hosts.add(hostsType.master);
+                  ht1.hosts = h1Hosts;
+                  Map<String, String> h1Params = new HashMap<String, String>();
+                  h1Params.put("desired_namenode_role", "active");
+
+                  HostsType ht2 = new HostsType();
+                  LinkedHashSet<String> h2Hosts = new LinkedHashSet<String>();
+                  h2Hosts.add(hostsType.secondary);
+                  ht2.hosts = h2Hosts;
+                  Map<String, String> h2Params = new HashMap<String, String>();
+                  h2Params.put("desired_namenode_role", "standby");
+
+
+                  builder.add(context, ht1, service.serviceName,
+                      svc.isClientOnlyService(), pc, h1Params);
+
+                  builder.add(context, ht2, service.serviceName,
+                      svc.isClientOnlyService(), pc, h2Params);
+                } else {
+                  // If no NameNode HA, then don't need to change hostsType.hosts since there should be exactly one.
+                  builder.add(context, hostsType, service.serviceName,
+                      svc.isClientOnlyService(), pc, null);
+                }
+
+                break;
             }
-
-            builder.add(context, hostsType, service.serviceName,
-                svc.isClientOnlyService(), pc);
-
           } else {
             builder.add(context, hostsType, service.serviceName,
-                svc.isClientOnlyService(), pc);
+                svc.isClientOnlyService(), pc, null);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
index ba44408..0e9d2c8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
@@ -108,7 +108,7 @@ public class ClusterGrouping extends Grouping {
 
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
       // !!! no-op in this case
     }
 
@@ -261,7 +261,8 @@ public class ClusterGrouping extends Grouping {
       }
 
       return new StageWrapper(
-          StageWrapper.Type.RU_TASKS, execution.title,
+          StageWrapper.Type.RU_TASKS,
+          execution.title,
           new TaskWrapper(service, component, hostNames, et));
     }
     return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
index 2aef43c..11e9267 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
@@ -81,7 +81,7 @@ public class ColocatedGrouping extends Grouping {
 
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
 
       boolean forUpgrade = ctx.getDirection().isUpgrade();
 
@@ -110,7 +110,7 @@ public class ColocatedGrouping extends Grouping {
           proxy.clientOnly = clientOnly;
           proxy.message = getStageText("Preparing",
               ctx.getComponentDisplay(service, pc.name), Collections.singleton(host));
-          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks, params));
           proxy.service = service;
           proxy.component = pc.name;
           targetList.add(proxy);
@@ -122,7 +122,7 @@ public class ColocatedGrouping extends Grouping {
           if (RestartTask.class.isInstance(t)) {
             proxy = new TaskProxy();
             proxy.clientOnly = clientOnly;
-            proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), t));
+            proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), params, t));
             proxy.restart = true;
             proxy.service = service;
             proxy.component = pc.name;
@@ -139,7 +139,7 @@ public class ColocatedGrouping extends Grouping {
           proxy.clientOnly = clientOnly;
           proxy.component = pc.name;
           proxy.service = service;
-          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks, params));
           proxy.message = getStageText("Completing",
               ctx.getComponentDisplay(service, pc.name), Collections.singleton(host));
           targetList.add(proxy);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
index d6db9b1..cd3ee68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import javax.xml.bind.annotation.XmlAttribute;
@@ -84,24 +85,26 @@ public class Grouping {
      * @param hostsType the order collection of hosts, which may have a master and secondary
      * @param service the service name
      * @param pc the ProcessingComponent derived from the upgrade pack.
+     * @param params additional parameters
      */
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-       boolean clientOnly, ProcessingComponent pc) {
+       boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
 
       boolean forUpgrade = ctx.getDirection().isUpgrade();
 
       // Construct the pre tasks during Upgrade/Downgrade direction.
       List<TaskBucket> buckets = buckets(resolveTasks(forUpgrade, true, pc));
       for (TaskBucket bucket : buckets) {
-        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks);
+        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         Set<String> preTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(preTasks);
         if (!preTasksEffectiveHosts.isEmpty()) {
           StageWrapper stage = new StageWrapper(
               bucket.type,
               getStageText("Preparing", ctx.getComponentDisplay(service, pc.name), preTasksEffectiveHosts),
+              params,
               preTasks
-              );
+          );
           m_stages.add(stage);
         }
       }
@@ -114,7 +117,8 @@ public class Grouping {
           StageWrapper stage = new StageWrapper(
               t.getStageWrapperType(),
               getStageText(t.getActionVerb(), ctx.getComponentDisplay(service, pc.name), Collections.singleton(hostName)),
-              new TaskWrapper(service, pc.name, Collections.singleton(hostName), t));
+              params,
+              new TaskWrapper(service, pc.name, Collections.singleton(hostName), params, t));
           m_stages.add(stage);
         }
       }
@@ -122,12 +126,13 @@ public class Grouping {
       // Construct the post tasks during Upgrade/Downgrade direction.
       buckets = buckets(resolveTasks(forUpgrade, false, pc));
       for (TaskBucket bucket : buckets) {
-        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks);
+        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         Set<String> postTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(postTasks);
         if (!postTasksEffectiveHosts.isEmpty()) {
           StageWrapper stage = new StageWrapper(
               bucket.type,
               getStageText("Completing", ctx.getComponentDisplay(service, pc.name), postTasksEffectiveHosts),
+              params,
               postTasks
               );
           m_stages.add(stage);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
index fec9978..0033185 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
@@ -103,7 +103,7 @@ public class ServiceCheckGrouping extends Grouping {
      */
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
       // !!! nothing to do here
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
index 92df3b5..2ea3671 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
@@ -19,8 +19,10 @@ package org.apache.ambari.server.state.stack.upgrade;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.gson.Gson;
@@ -33,16 +35,41 @@ public class StageWrapper {
   private static Gson gson = new Gson();
   private String text;
   private Type type;
-
+  private Map<String, String> params;
   private List<TaskWrapper> tasks;
 
+  /**
+   * Wrapper for a stage that encapsulates its text and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param tasks List of tasks to add to the stage
+   */
   public StageWrapper(Type type, String text, TaskWrapper... tasks) {
-    this(type, text, Arrays.asList(tasks));
+    this(type, text, null, Arrays.asList(tasks));
+  }
+
+  /**
+   * Wrapper for a stage that encapsulates its text, params, and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param params Command parameters
+   * @param tasks List of tasks to add to the stage
+   */
+  public StageWrapper(Type type, String text, Map<String, String> params, TaskWrapper... tasks) {
+    this(type, text, params, Arrays.asList(tasks));
   }
 
-  public StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
+  /**
+   * Wrapper for a stage that encapsulates its text, params, and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param params Command parameters
+   * @param tasks List of tasks to add to the stage
+   */
+  public StageWrapper(Type type, String text, Map<String, String> params, List<TaskWrapper> tasks) {
     this.type = type;
     this.text = text;
+    this.params = (params == null ? Collections.<String, String>emptyMap() : params);
     this.tasks = tasks;
   }
 
@@ -78,6 +105,13 @@ public class StageWrapper {
   }
 
   /**
+   * @return the additional command parameters
+   */
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  /**
    * @return the wrapped tasks for this stage
    */
   public List<TaskWrapper> getTasks() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
index d4ee9a8..6ef0980 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.ambari.server.serveraction.upgrades.AutoSkipFailedSummaryAction;
@@ -66,9 +67,11 @@ public abstract class StageWrapperBuilder {
    *          whether the service is client only, no service checks
    * @param pc
    *          the ProcessingComponent derived from the upgrade pack
+   * @param params
+   *          additional parameters
    */
   public abstract void add(UpgradeContext upgradeContext, HostsType hostsType, String service,
-      boolean clientOnly, ProcessingComponent pc);
+      boolean clientOnly, ProcessingComponent pc, Map<String, String> params);
 
   /**
    * Builds the stage wrappers, including any pre- and post-procesing that needs

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
index f7cc930..69b3f8b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
@@ -18,7 +18,9 @@
 package org.apache.ambari.server.state.stack.upgrade;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -29,6 +31,7 @@ public class TaskWrapper {
   private String service;
   private String component;
   private Set<String> hosts; // all the hosts that all the tasks must run
+  private Map<String, String> params;
   private List<Task> tasks; // all the tasks defined for the hostcomponent
 
   /**
@@ -38,8 +41,20 @@ public class TaskWrapper {
    * @param tasks an array of tasks as a convenience
    */
   public TaskWrapper(String s, String c, Set<String> hosts, Task... tasks) {
-    this(s, c, hosts, Arrays.asList(tasks));
+    this(s, c, hosts, null, Arrays.asList(tasks));
   }
+  
+  /**
+   * @param s the service name for the tasks
+   * @param c the component name for the tasks
+   * @param hosts the set of hosts that the tasks are for
+   * @param params additional command parameters
+   * @param tasks an array of tasks as a convenience
+   */
+  public TaskWrapper(String s, String c, Set<String> hosts, Map<String, String> params, Task... tasks) {
+    this(s, c, hosts, params, Arrays.asList(tasks));
+  }
+
 
   /**
    * @param s the service name for the tasks
@@ -47,15 +62,23 @@ public class TaskWrapper {
    * @param hosts the set of hosts for the
    * @param tasks the list of tasks
    */
-  public TaskWrapper(String s, String c, Set<String> hosts, List<Task> tasks) {
+  public TaskWrapper(String s, String c, Set<String> hosts, Map<String, String> params, List<Task> tasks) {
     service = s;
     component = c;
 
     this.hosts = hosts;
+    this.params = (params == null) ? new HashMap<String, String>() : params;
     this.tasks = tasks;
   }
 
   /**
+   * @return the additional command parameters.
+   */
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  /**
    * @return the tasks associated with this wrapper
    */
   public List<Task> getTasks() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
index a5813e3..057c310 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.ambari.server.stack.HostsType;
@@ -43,15 +44,16 @@ public class TaskWrapperBuilder {
    * @param component the component name for the tasks
    * @param hostsType the collection of sets along with their status
    * @param tasks collection of tasks
+   * @param params additional parameters
    */
-  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks) {
+  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks, Map<String, String> params) {
     List<TaskWrapper> collection = new ArrayList<TaskWrapper>();
     for (Task t : tasks) {
       if (t.getType().equals(Task.Type.EXECUTE)) {
         ExecuteTask et = (ExecuteTask) t;
         if (et.hosts == ExecuteHostType.MASTER) {
           if (hostsType.master != null) {
-            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), t));
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), params, t));
             continue;
           } else {
             LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a master but could not find any masters to run on. Skipping this task.", service, component));
@@ -61,7 +63,7 @@ public class TaskWrapperBuilder {
         // Pick a random host.
         if (et.hosts == ExecuteHostType.ANY) {
           if (hostsType.hosts != null && !hostsType.hosts.isEmpty()) {
-            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.hosts.iterator().next()), t));
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.hosts.iterator().next()), params, t));
             continue;
           } else {
             LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a any host but could not find host to run on. Skipping this task.", service, component));
@@ -70,7 +72,7 @@ public class TaskWrapperBuilder {
         }
       }
 
-      collection.add(new TaskWrapper(service, component, hostsType.hosts, t));
+      collection.add(new TaskWrapper(service, component, hostsType.hosts, params, t));
     }
 
     return collection;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
index fa68435..1d242e1 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
@@ -27,8 +27,23 @@ from resource_management.libraries.functions.security_commons import build_expec
 from hdfs import hdfs
 from ambari_commons.os_family_impl import OsFamilyImpl
 from ambari_commons import OSConst
+from utils import get_hdfs_binary
 
 class DataNode(Script):
+
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-datanode"}
+
+  def get_hdfs_binary(self):
+    """
+    Get the name or path to the hdfs binary depending on the stack and version.
+    """
+    import params
+    stack_to_comp = self.get_stack_to_component()
+    if params.stack_name in stack_to_comp:
+      return get_hdfs_binary(stack_to_comp[params.stack_name])
+    return "hdfs"
+
   def install(self, env):
     import params
     self.install_packages(env, params.exclude_packages)
@@ -40,19 +55,20 @@ class DataNode(Script):
     hdfs("datanode")
     datanode(action="configure")
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
     datanode(action="start")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     # pre-upgrade steps shutdown the datanode, so there's no need to call
-    # action=stop
-    if rolling_restart:
-      stopped = datanode_upgrade.pre_upgrade_shutdown()
+
+    hdfs_binary = self.get_hdfs_binary()
+    if upgrade_type == "rolling":
+      stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary)
       if not stopped:
         datanode(action="stop")
     else:
@@ -67,23 +83,21 @@ class DataNode(Script):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class DataNodeDefault(DataNode):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-datanode"}
-
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing DataNode Stack Upgrade pre-restart")
     import params
     env.set_params(params)
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-datanode", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing DataNode Stack Upgrade post-restart")
     import params
     env.set_params(params)
+    hdfs_binary = self.get_hdfs_binary()
     # ensure the DataNode has started and rejoined the cluster
-    datanode_upgrade.post_upgrade_check()
+    datanode_upgrade.post_upgrade_check(hdfs_binary)
 
   def security_status(self, env):
     import status_params

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
index 2e5ac19..6138f8c 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
@@ -26,12 +26,13 @@ from resource_management.libraries.functions import format
 from resource_management.libraries.functions.decorator import retry
 
 
-def pre_upgrade_shutdown():
+def pre_rolling_upgrade_shutdown(hdfs_binary):
   """
   Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the
   DataNode in preparation for an upgrade. This will then periodically check
   "getDatanodeInfo" to ensure the DataNode has shutdown correctly.
   This function will obtain the Kerberos ticket if security is enabled.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully.
   """
   import params
@@ -40,38 +41,39 @@ def pre_upgrade_shutdown():
   if params.security_enabled:
     Execute(params.dn_kinit_cmd, user = params.hdfs_user)
 
-  command = format('hdfs dfsadmin -shutdownDatanode {dfs_dn_ipc_address} upgrade')
+  command = format('{hdfs_binary} dfsadmin -shutdownDatanode {dfs_dn_ipc_address} upgrade')
 
   code, output = shell.call(command, user=params.hdfs_user)
   if code == 0:
     # verify that the datanode is down
-    _check_datanode_shutdown()
+    _check_datanode_shutdown(hdfs_binary)
   else:
-    # Due to bug HDFS-7533, DataNode may not always shutdown during rolling upgrade, and it is necessary to kill it.
+    # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it.
     if output is not None and re.search("Shutdown already in progress", output):
       Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command))
       return False
   return True
 
 
-def post_upgrade_check():
+def post_upgrade_check(hdfs_binary):
   """
   Verifies that the DataNode has rejoined the cluster. This function will
   obtain the Kerberos ticket if security is enabled.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...")
   if params.security_enabled:
-    Execute(params.dn_kinit_cmd,user = params.hdfs_user)
+    Execute(params.dn_kinit_cmd, user=params.hdfs_user)
 
   # verify that the datanode has started and rejoined the HDFS cluster
-  _check_datanode_startup()
+  _check_datanode_startup(hdfs_binary)
 
 
 @retry(times=24, sleep_time=5, err_class=Fail)
-def _check_datanode_shutdown():
+def _check_datanode_shutdown(hdfs_binary):
   """
   Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
   several times, pausing in between runs. Once the DataNode stops responding
@@ -84,13 +86,14 @@ def _check_datanode_shutdown():
   https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
   times for ipc.client.connect.retry.interval. In the meantime, override them
   here, but only for RU.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   # override stock retry timeouts since after 30 seconds, the datanode is
   # marked as dead and can affect HBase during RU
-  command = format('hdfs dfsadmin -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
+  command = format('{hdfs_binary} dfsadmin -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
 
   try:
     Execute(command, user=params.hdfs_user, tries=1)
@@ -103,19 +106,19 @@ def _check_datanode_shutdown():
 
 
 @retry(times=12, sleep_time=10, err_class=Fail)
-def _check_datanode_startup():
+def _check_datanode_startup(hdfs_binary):
   """
   Checks that a DataNode is reported as being alive via the
   "hdfs dfsadmin -report -live" command. Once the DataNode is found to be
   alive this method will return, otherwise it will raise a Fail(...) and retry
   automatically.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   try:
-    # 'su - hdfs -c "hdfs dfsadmin -report -live"'
-    command = 'hdfs dfsadmin -report -live'
+    command = format('{hdfs_binary} dfsadmin -report -live')
     return_code, hdfs_output = shell.call(command, user=params.hdfs_user)
   except:
     raise Fail('Unable to determine if the DataNode has started after upgrade.')

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
index dd0dca4..16218b6 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
@@ -40,11 +40,11 @@ class HdfsClient(Script):
     env.set_params(params)
     hdfs()
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
@@ -57,7 +57,7 @@ class HdfsClientDefault(HdfsClient):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-client"}
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
index b11d7ea..98b8afd 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
@@ -36,18 +36,27 @@ from resource_management.core.shell import as_user
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 
-from utils import service, safe_zkfc_op
+from utils import service, safe_zkfc_op, is_previous_fs_image
 from setup_ranger_hdfs import setup_ranger_hdfs
+from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
+
 
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def namenode(action=None, do_format=True, rolling_restart=False, env=None):
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, env=None):
+  if action is None:
+    raise Fail('"action" parameter is required for function namenode().')
+
+  if action in ["start", "stop"] and hdfs_binary is None:
+    raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
   if action == "configure":
     import params
     #we need this directory to be present before any action(HA manual steps for
     #additional namenode)
     create_name_dirs(params.dfs_name_dir)
   elif action == "start":
-    setup_ranger_hdfs(rolling_upgrade = rolling_restart)
+    Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type)))
+    setup_ranger_hdfs(upgrade_type=upgrade_type)
     import params
     if do_format:
       format_namenode()
@@ -70,13 +79,33 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
         if not success:
           raise Fail("Could not bootstrap standby namenode")
 
-    options = "-rollingUpgrade started" if rolling_restart else ""
-
-    if rolling_restart:
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
       # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried
       # to kill ZKFC manually, so we need to start it if not already running.
       safe_zkfc_op(action, env)
 
+    options = ""
+    if upgrade_type == "rolling":
+      options = "-rollingUpgrade started"
+    elif upgrade_type == "nonrolling":
+      is_previous_image_dir = is_previous_fs_image()
+      Logger.info(format("Previous file system image dir present is {is_previous_image_dir}"))
+
+      if params.dfs_ha_enabled:
+        if params.desired_namenode_role is None:
+          raise Fail("Did not receive parameter \"desired_namenode_role\" to indicate the role that this NameNode should have.")
+
+        if params.desired_namenode_role == "active":
+          # The "-upgrade" command can only be used exactly once. If used more than once during a retry, it will cause problems.
+          options = "" if is_previous_image_dir else "-upgrade"
+
+        if params.desired_namenode_role == "standby":
+          options = "-bootstrapStandby -force"
+      else:
+        # Both Primary and Secondary NameNode can use the same command.
+        options = "" if is_previous_image_dir else "-upgrade"
+    Logger.info(format("Option for start command: {options}"))
+
     service(
       action="start",
       name="namenode",
@@ -90,53 +119,66 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
       Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
               user = params.hdfs_user)
 
-    is_namenode_safe_mode_off = format("hdfs dfsadmin -fs {namenode_address} -safemode get | grep 'Safe mode is OFF'")
+    is_namenode_safe_mode_off = format("{hdfs_binary} dfsadmin -fs {namenode_address} -safemode get | grep 'Safe mode is OFF'")
     if params.dfs_ha_enabled:
-      is_active_namenode_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+      is_active_namenode_cmd = as_user(format("{hdfs_binary} --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
     else:
       is_active_namenode_cmd = None
+    
+    # During NonRolling Upgrade, both NameNodes are initially down,
+    # so no point in checking if this is the active or standby.
+    if upgrade_type == "nonrolling":
+      is_active_namenode_cmd = None
 
-    # During normal operations, if HA is enabled and it is in standby, then no need to check safemode staus.
-    # During Rolling Upgrade, both namenodes must eventually leave safemode, and Ambari can wait for this.
+    # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____|
+    # no-HA                 | ON -> OFF                | Yes                      |
+    # HA and active         | ON -> OFF                | Yes                      |
+    # HA and standby        | no change                | no check                 |
+    # RU with HA on active  | ON -> OFF                | Yes                      |
+    # RU with HA on standby | ON -> OFF                | Yes                      |
+    # EU with HA on active  | no change                | no check                 |
+    # EU with HA on standby | no change                | no check                 |
+    # EU non-HA             | no change                | no check                 |
 
-    # ___Scenario_________|_Expected safemode state___|_Wait for safemode OFF____|
-    # 1 (HA and active)   | ON -> OFF                 | Yes                      |
-    # 2 (HA and standby)  | no change (yes during RU) | no check (yes during RU) |
-    # 3 (no-HA)           | ON -> OFF                 | Yes                      |
     check_for_safemode_off = False
     msg = ""
     if params.dfs_ha_enabled:
-      code, out = shell.call(is_active_namenode_cmd, logoutput=True) # If active NN, code will be 0
-      if code == 0: # active
-        check_for_safemode_off = True
-        msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
-      elif rolling_restart:
+      if upgrade_type is not None:
         check_for_safemode_off = True
-        msg = "Must wait to leave safemode since High Availability is enabled during a Rolling Upgrade"
+        msg = "Must wait to leave safemode since High Availability is enabled during a Stack Upgrade"
+      else:
+        # During normal operations, the NameNode is expected to be up.
+        code, out = shell.call(is_active_namenode_cmd, logoutput=True) # If active NN, code will be 0
+        if code == 0: # active
+          check_for_safemode_off = True
+          msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
+        else:
+          msg = "Will remain in the current safemode state."
     else:
       msg = "Must wait to leave safemode since High Availability is not enabled."
       check_for_safemode_off = True
 
-    if not msg:
-      msg = "Will remain in the current safemode state."
     Logger.info(msg)
 
+    # During a NonRolling (aka Express Upgrade), stay in safemode since the DataNodes are down.
+    stay_in_safe_mode = False
+    if upgrade_type == "nonrolling":
+      stay_in_safe_mode = True
+
     if check_for_safemode_off:
-      # First check if Namenode is not in 'safemode OFF' (equivalent to safemode ON). If safemode is OFF, no change.
-      # If safemode is ON, first wait for NameNode to leave safemode on its own (if that doesn't happen within 30 seconds, then
-      # force NameNode to leave safemode).
-      Logger.info("Checking the NameNode safemode status since may need to transition from ON to OFF.")
-
-      try:
-        # Wait up to 30 mins
-        Execute(is_namenode_safe_mode_off,
-                tries=180,
-                try_sleep=10,
-                user=params.hdfs_user,
-                logoutput=True
-        )
-      except Fail:
-        Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
+      Logger.info("Stay in safe mode: {0}".format(stay_in_safe_mode))
+      if not stay_in_safe_mode:
+        Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+        try:
+          # Wait up to 30 mins
+          Execute(is_namenode_safe_mode_off,
+                  tries=180,
+                  try_sleep=10,
+                  user=params.hdfs_user,
+                  logoutput=True
+          )
+        except Fail:
+          Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
 
     # Always run this on non-HA, or active NameNode during HA.
     create_hdfs_directories(is_active_namenode_cmd)
@@ -154,7 +196,13 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
     decommission()
 
 @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
-def namenode(action=None, do_format=True, rolling_restart=False, env=None):
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, env=None):
+  if action is None:
+    raise Fail('"action" parameter is required for function namenode().')
+
+  if action in ["start", "stop"] and hdfs_binary is None:
+    raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
   if action == "configure":
     pass
   elif action == "start":

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
index 46c7272..2ef1b69 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
@@ -45,8 +45,8 @@ class JournalNodeDefault(JournalNode):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-hdfs-journalnode"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -54,7 +54,7 @@ class JournalNodeDefault(JournalNode):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-journalnode", params.version)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
@@ -65,13 +65,16 @@ class JournalNodeDefault(JournalNode):
       create_log_dir=True
     )
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    if upgrade_type == "nonrolling":
+      return
+
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
     journalnode_upgrade.post_upgrade_check()
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
index e2ebbcb..850c32d 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
@@ -31,7 +31,7 @@ from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
 
 def post_upgrade_check():
   """
-  Ensure all journal nodes are up and quorum is established
+  Ensure all journal nodes are up and quorum is established during Rolling Upgrade.
   :return:
   """
   import params

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 93bbc0f..bb60ec3 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -42,11 +42,13 @@ from resource_management.core.logger import Logger
 from ambari_commons.os_family_impl import OsFamilyImpl
 from ambari_commons import OSConst
 
+
 import namenode_upgrade
 from hdfs_namenode import namenode
 from hdfs import hdfs
 import hdfs_rebalance
-from utils import initiate_safe_zkfc_failover
+from utils import initiate_safe_zkfc_failover, get_hdfs_binary
+
 
 
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
@@ -62,6 +64,19 @@ except ImportError:
 
 class NameNode(Script):
 
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-namenode"}
+
+  def get_hdfs_binary(self):
+    """
+    Get the name or path to the hdfs binary depending on the stack and version.
+    """
+    import params
+    stack_to_comp = self.get_stack_to_component()
+    if params.stack_name in stack_to_comp:
+      return get_hdfs_binary(stack_to_comp[params.stack_name])
+    return "hdfs"
+
   def install(self, env):
     import params
     self.install_packages(env, params.exclude_packages)
@@ -73,40 +88,41 @@ class NameNode(Script):
     import params
     env.set_params(params)
     hdfs("namenode")
-    namenode(action="configure", env=env)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="configure", hdfs_binary=hdfs_binary, env=env)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
-    namenode(action="start", rolling_restart=rolling_restart, env=env)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    if rolling_restart and params.dfs_ha_enabled:
+    hdfs_binary = self.get_hdfs_binary()
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
       if params.dfs_ha_automatic_failover_enabled:
         initiate_safe_zkfc_failover()
       else:
         raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
-    namenode(action="stop", rolling_restart=rolling_restart, env=env)
+    namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
 
   def status(self, env):
     import status_params
     env.set_params(status_params)
-    namenode(action="status", rolling_restart=False, env=env)
+    namenode(action="status", env=env)
 
   def decommission(self, env):
     import params
     env.set_params(params)
-    namenode(action="decommission")
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="decommission", hdfs_binary=hdfs_binary)
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class NameNodeDefault(NameNode):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-namenode"}
-
   def restore_snapshot(self, env):
     """
     Restore the snapshot during a Downgrade.
@@ -115,21 +131,73 @@ class NameNodeDefault(NameNode):
     pass
 
   def prepare_non_rolling_upgrade(self, env):
-    print "TODO AMBARI-12698"
-    pass
+    """
+    If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
+    make sure that there is no "/previous" directory.
+
+    Create a list of all the DataNodes in the cluster.
+    hdfs dfsadmin -report > dfs-old-report-1.log
+
+    hdfs dfsadmin -safemode enter
+    hdfs dfsadmin -saveNamespace
+
+    Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
+
+    Store the layoutVersion for the NameNode located at ${dfs.namenode.name.dir}/current/VERSION, into a backup directory
+
+    Finalize any prior HDFS upgrade,
+    hdfs dfsadmin -finalizeUpgrade
+    """
+    import params
+    Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
+
+    if params.security_enabled:
+      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+              user=params.hdfs_user)
+
+    hdfs_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_upgrade_check_for_previous_dir()
+    namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_backup_namenode_dir()
+    namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary)
 
   def prepare_rolling_upgrade(self, env):
-    namenode_upgrade.prepare_rolling_upgrade()
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_rolling_upgrade(hfds_binary)
+
+  def wait_for_safemode_off(self, env):
+    """
+    During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
+    all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
+    """
+    import params
+
+    Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+    try:
+      hdfs_binary = self.get_hdfs_binary()
+      # Note, this fails if namenode_address isn't prefixed with "params."
+      is_namenode_safe_mode_off = format("{hdfs_binary} dfsadmin -fs {params.namenode_address} -safemode get | grep 'Safe mode is OFF'")
+      # Wait up to 30 mins
+      Execute(is_namenode_safe_mode_off,
+              tries=180,
+              try_sleep=10,
+              user=params.hdfs_user,
+              logoutput=True
+      )
+    except Fail:
+      Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
 
   def finalize_non_rolling_upgrade(self, env):
-    print "TODO AMBARI-12698"
-    pass
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary)
 
   def finalize_rolling_upgrade(self, env):
-    namenode_upgrade.finalize_rolling_upgrade()
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("rolling", hfds_binary)
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -137,12 +205,13 @@ class NameNodeDefault(NameNode):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-namenode", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
 
-    Execute("hdfs dfsadmin -report -live",
+    hdfs_binary = self.get_hdfs_binary()
+    Execute(format("{hdfs_binary} dfsadmin -report -live"),
             user=params.hdfs_user
     )
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
index e8c142c..d6b6225 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
@@ -180,6 +180,33 @@ class NamenodeHAState:
       return self.get_address_for_host(hostname)
     return None
 
+  def is_active(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the active NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE)
+
+  def is_standby(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the standby NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.STANDBY)
+
+  def _is_in_state(self, host_name, state):
+    """
+    :param host_name: Host name
+    :param state: State to check
+    :return: Return True if this NameNode is in the specified state, otherwise, False.
+    """
+    mapping = self.get_namenode_state_to_hostnames()
+    if state in mapping:
+      hosts_in_state = mapping[state]
+      if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower():
+        return True
+    return False
+
   def is_healthy(self):
     """
     :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist.

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
index fb39878..c8c057d 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
@@ -17,26 +17,148 @@ limitations under the License.
 
 """
 import re
+import os
 
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute
-from resource_management.libraries.functions.format import format
-from resource_management.libraries.functions.default import default
 from resource_management.core import shell
-from resource_management.libraries.functions import Direction, SafeMode
+from resource_management.core.shell import as_user
 from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.libraries.functions import Direction, SafeMode
+
+from namenode_ha_state import NamenodeHAState
 
 
 safemode_to_instruction = {SafeMode.ON: "enter",
                            SafeMode.OFF: "leave"}
 
-def reach_safemode_state(user, safemode_state, in_ha):
+
+def prepare_upgrade_check_for_previous_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data.
+  Check that there is no "previous" folder inside the NameNode Name Dir.
+  """
+  import params
+
+  if params.dfs_ha_enabled:
+    namenode_ha = NamenodeHAState()
+    if namenode_ha.is_active(params.hostname):
+      Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
+
+      problematic_previous_namenode_dirs = set()
+      nn_name_dirs = params.dfs_name_dir.split(',')
+      for nn_dir in nn_name_dirs:
+        if os.path.isdir(nn_dir):
+          # Check for a previous folder, which is not allowed.
+          previous_dir = os.path.join(nn_dir, "previous")
+          if os.path.isdir(previous_dir):
+            problematic_previous_namenode_dirs.add(previous_dir)
+
+      if len(problematic_previous_namenode_dirs) > 0:
+        message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \
+                  'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \
+                  'NameNode Name Dir(s): {0}\n' \
+                  '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs))
+        Logger.error(message)
+        raise Fail(message)
+
+def prepare_upgrade_enter_safe_mode(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  safe_mode_enter_cmd = format("{hdfs_binary} dfsadmin -safemode enter")
+  safe_mode_enter_and_check_for_on = format("{safe_mode_enter_cmd} | grep 'Safe mode is ON'")
+  try:
+    # Safe to call if already in Safe Mode
+    Logger.info("Enter SafeMode if not already in it.")
+    as_user(safe_mode_enter_and_check_for_on, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+  except Exception, e:
+    message = format("Could not enter safemode. As the HDFS user, call this command: {safe_mode_enter_cmd}")
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_save_namespace(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  save_namespace_cmd = format("{hdfs_binary} dfsadmin -saveNamespace")
+  try:
+    Logger.info("Checkpoint the current namespace.")
+    as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+  except Exception, e:
+    message = format("Could save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}")
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_backup_namenode_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs.
+  """
+  import params
+
+  i = 0
+  failed_paths = []
+  nn_name_dirs = params.dfs_name_dir.split(',')
+  backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted)
+  if len(nn_name_dirs) > 0:
+    Logger.info("Backup the NameNode name directory's CURRENT folder.")
+  for nn_dir in nn_name_dirs:
+    i += 1
+    namenode_current_image = os.path.join(nn_dir, "current")
+    unique = get_unique_id_and_date() + "_" + str(i)
+    # Note that /tmp may not be writeable.
+    backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique)
+
+    if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder):
+      try:
+        os.makedirs(backup_current_folder)
+        Execute(('cp', '-ar', namenode_current_image, backup_current_folder),
+                sudo=True
+        )
+      except Exception, e:
+        failed_paths.append(namenode_current_image)
+  if len(failed_paths) > 0:
+    Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is "
+                 "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir,
+                                                                                           ", ".join(failed_paths)))
+
+def prepare_upgrade_finalize_previous_upgrades(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  finalize_command = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
+  try:
+    Logger.info("Attempt to Finalize if there are any in-progress upgrades. "
+                "This will return 255 if no upgrades are in progress.")
+    code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user)
+    if out:
+      expected_substring = "there is no rolling upgrade in progress"
+      if expected_substring not in out.lower():
+        Logger.warning('Finalize command did not contain substring: %s' % expected_substring)
+    else:
+      Logger.warning("Finalize command did not return any output.")
+  except Exception, e:
+    Logger.warning("Ensure no upgrades are in progress.")
+
+def reach_safemode_state(user, safemode_state, in_ha, hdfs_binary):
   """
   Enter or leave safemode for the Namenode.
-  @param user: user to perform action as
-  @param safemode_state: Desired state of ON or OFF
-  @param in_ha: bool indicating if Namenode High Availability is enabled
-  @:return Returns a tuple of (transition success, original state). If no change is needed, the indicator of
+  :param user: user to perform action as
+  :param safemode_state: Desired state of ON or OFF
+  :param in_ha: bool indicating if Namenode High Availability is enabled
+  :param hdfs_binary: name/path of the HDFS binary to use
+  :return: Returns a tuple of (transition success, original state). If no change is needed, the indicator of
   success will be True
   """
   Logger.info("Prepare to transition into safemode state %s" % safemode_state)
@@ -44,7 +166,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
   original_state = SafeMode.UNKNOWN
 
   hostname = params.hostname
-  safemode_check = format("hdfs dfsadmin -safemode get")
+  safemode_check = format("{hdfs_binary} dfsadmin -safemode get")
 
   grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
   safemode_check_with_grep = format("hdfs dfsadmin -safemode get | grep '{grep_pattern}'")
@@ -61,7 +183,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
         return (True, original_state)
       else:
         # Make a transition
-        command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
+        command = "{0} dfsadmin -safemode {1}".format(hdfs_binary, safemode_to_instruction[safemode_state])
         Execute(command,
                 user=user,
                 logoutput=True,
@@ -74,7 +196,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
   return (False, original_state)
 
 
-def prepare_rolling_upgrade():
+def prepare_rolling_upgrade(hdfs_binary):
   """
   Perform either an upgrade or a downgrade.
 
@@ -83,6 +205,7 @@ def prepare_rolling_upgrade():
   1. Leave safemode if the safemode status is not OFF
   2. Execute a rolling upgrade "prepare"
   3. Execute a rolling upgrade "query"
+  :param hdfs_binary: name/path of the HDFS binary to use
   """
   import params
 
@@ -96,12 +219,12 @@ def prepare_rolling_upgrade():
 
 
   if params.upgrade_direction == Direction.UPGRADE:
-    safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True)
+    safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True, hdfs_binary)
     if not safemode_transition_successful:
       raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF))
 
-    prepare = "hdfs dfsadmin -rollingUpgrade prepare"
-    query = "hdfs dfsadmin -rollingUpgrade query"
+    prepare = format("{hdfs_binary} dfsadmin -rollingUpgrade prepare")
+    query = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
     Execute(prepare,
             user=params.hdfs_user,
             logoutput=True)
@@ -111,9 +234,11 @@ def prepare_rolling_upgrade():
   elif params.upgrade_direction == Direction.DOWNGRADE:
     pass
 
-def finalize_rolling_upgrade():
+def finalize_upgrade(upgrade_type, hdfs_binary):
   """
   Finalize the Namenode upgrade, at which point it cannot be downgraded.
+  :param upgrade_type rolling or nonrolling
+  :param hdfs_binary: name/path of the HDFS binary to use
   """
   Logger.info("Executing Rolling Upgrade finalize")
   import params
@@ -122,8 +247,15 @@ def finalize_rolling_upgrade():
     kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") 
     Execute(kinit_command, user=params.hdfs_user, logoutput=True)
 
-  finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
-  query_cmd = "hdfs dfsadmin -rollingUpgrade query"
+  finalize_cmd = ""
+  query_cmd = ""
+  if upgrade_type == "rolling":
+    finalize_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
+    query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
+
+  elif upgrade_type == "nonrolling":
+    finalize_cmd = format("{hdfs_binary} dfsadmin -finalizeUpgrade")
+    query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
 
   Execute(query_cmd,
         user=params.hdfs_user,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
index be6f0d5..df5569e 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
@@ -41,7 +41,7 @@ class NFSGateway(Script):
 
     self.install_packages(env, params.exclude_packages)
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
@@ -49,14 +49,14 @@ class NFSGateway(Script):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-nfs3", params.version)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
     self.configure(env)
     nfsgateway(action="start")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)