You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/10/22 04:24:30 UTC

[2/2] ambari git commit: AMBARI-12701. Stop-and-Start Upgrade: Handle Core Services (alejandro)

AMBARI-12701. Stop-and-Start Upgrade: Handle Core Services (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7afe5a4e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7afe5a4e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7afe5a4e

Branch: refs/heads/trunk
Commit: 7afe5a4ec9200c41922fb1c0809f34a0d8d700bd
Parents: 34a0353
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Wed Oct 21 17:48:24 2015 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Wed Oct 21 19:22:52 2015 -0700

----------------------------------------------------------------------
 .../libraries/script/script.py                  |  68 +++++---
 .../PreUpgradeCheckResourceProvider.java        |  12 +-
 .../internal/UpgradeResourceProvider.java       |  50 +++++-
 .../ambari/server/stack/MasterHostResolver.java |  23 +++
 .../ambari/server/state/UpgradeHelper.java      |  76 +++++++--
 .../state/stack/upgrade/ClusterGrouping.java    |   5 +-
 .../state/stack/upgrade/ColocatedGrouping.java  |   8 +-
 .../server/state/stack/upgrade/Grouping.java    |  15 +-
 .../stack/upgrade/ServiceCheckGrouping.java     |   2 +-
 .../state/stack/upgrade/StageWrapper.java       |  40 ++++-
 .../stack/upgrade/StageWrapperBuilder.java      |   5 +-
 .../server/state/stack/upgrade/TaskWrapper.java |  27 ++-
 .../state/stack/upgrade/TaskWrapperBuilder.java |  10 +-
 .../HDFS/2.1.0.2.0/package/scripts/datanode.py  |  40 +++--
 .../package/scripts/datanode_upgrade.py         |  27 +--
 .../2.1.0.2.0/package/scripts/hdfs_client.py    |   6 +-
 .../2.1.0.2.0/package/scripts/hdfs_namenode.py  | 124 +++++++++-----
 .../2.1.0.2.0/package/scripts/journalnode.py    |  15 +-
 .../package/scripts/journalnode_upgrade.py      |   2 +-
 .../HDFS/2.1.0.2.0/package/scripts/namenode.py  | 115 ++++++++++---
 .../package/scripts/namenode_ha_state.py        |  27 +++
 .../package/scripts/namenode_upgrade.py         | 166 +++++++++++++++++--
 .../2.1.0.2.0/package/scripts/nfsgateway.py     |   6 +-
 .../2.1.0.2.0/package/scripts/params_linux.py   |   8 +-
 .../package/scripts/setup_ranger_hdfs.py        |   4 +-
 .../HDFS/2.1.0.2.0/package/scripts/snamenode.py |   8 +-
 .../HDFS/2.1.0.2.0/package/scripts/utils.py     |  30 ++++
 .../2.1.0.2.0/package/scripts/zkfc_slave.py     |   4 +-
 .../scripts/application_timeline_server.py      |   8 +-
 .../2.1.0.2.0/package/scripts/historyserver.py  |   8 +-
 .../package/scripts/mapreduce2_client.py        |   2 +-
 .../2.1.0.2.0/package/scripts/nodemanager.py    |  12 +-
 .../2.1.0.2.0/package/scripts/params_linux.py   |   2 +-
 .../package/scripts/resourcemanager.py          |   8 +-
 .../2.1.0.2.0/package/scripts/yarn_client.py    |   2 +-
 .../3.4.5.2.0/package/scripts/zookeeper.py      |   6 +-
 .../package/scripts/zookeeper_client.py         |   8 +-
 .../package/scripts/zookeeper_server.py         |  25 +--
 .../package/scripts/zookeeper_service.py        |   4 +-
 .../HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml |  31 +++-
 .../stacks/HDP/2.2/upgrades/config-upgrade.xml  |  11 ++
 .../HDP/2.2/upgrades/nonrolling-upgrade-2.2.xml |  13 ++
 .../stack/upgrade/StageWrapperBuilderTest.java  |   3 +-
 .../python/stacks/2.0.6/HDFS/test_datanode.py   |  22 +--
 .../stacks/2.0.6/HDFS/test_hdfs_client.py       |   8 +-
 .../stacks/2.0.6/HDFS/test_journalnode.py       |  18 +-
 .../python/stacks/2.0.6/HDFS/test_namenode.py   |  24 +--
 .../python/stacks/2.0.6/HDFS/test_nfsgateway.py |   4 +-
 .../stacks/2.0.6/YARN/test_historyserver.py     |   4 +-
 .../stacks/2.0.6/YARN/test_mapreduce2_client.py |   4 +-
 .../stacks/2.0.6/YARN/test_nodemanager.py       |  16 +-
 .../stacks/2.0.6/YARN/test_resourcemanager.py   |   4 +-
 .../stacks/2.0.6/YARN/test_yarn_client.py       |   4 +-
 .../2.0.6/ZOOKEEPER/test_zookeeper_client.py    |   8 +-
 .../2.0.6/ZOOKEEPER/test_zookeeper_server.py    |  12 +-
 .../stacks/2.1/YARN/test_apptimelineserver.py   |   4 +-
 56 files changed, 894 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index e647c11..e3bae5d 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -26,6 +26,7 @@ import os
 import sys
 import logging
 import platform
+import inspect
 import tarfile
 from ambari_commons import OSCheck, OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
@@ -427,18 +428,19 @@ class Script(object):
     sys.exit(1)
 
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """
     self.fail_with_error("start method isn't implemented")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """
     self.fail_with_error("stop method isn't implemented")
 
+  # TODO, remove after all services have switched to pre_upgrade_restart
   def pre_rolling_restart(self, env):
     """
     To be overridden by subclasses
@@ -463,45 +465,67 @@ class Script(object):
       command_params = config["commandParams"] if "commandParams" in config else None
       if command_params is not None:
         restart_type = command_params["restart_type"] if "restart_type" in command_params else ""
-        if restart_type:
-          restart_type = restart_type.encode('ascii', 'ignore')
 
-    rolling_restart = restart_type.lower().startswith("rolling")
+    upgrade_type = None
+    if restart_type.lower() == "rolling_upgrade":
+      upgrade_type = "rolling"
+    elif restart_type.lower() == "nonrolling_upgrade":
+      upgrade_type = "nonrolling"
+
+    is_stack_upgrade = upgrade_type is not None
 
     if componentCategory and componentCategory.strip().lower() == 'CLIENT'.lower():
-      if rolling_restart:
-        self.pre_rolling_restart(env)
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the pre_upgrade_restart method. Once done. remove the else-block.
+        if "pre_upgrade_restart" in dir(self):
+          self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.pre_rolling_restart(env)
 
       self.install(env)
     else:
-      # To remain backward compatible with older stacks, only pass rolling_restart if True.
-      if rolling_restart:
-        self.stop(env, rolling_restart=rolling_restart)
+      # To remain backward compatible with older stacks, only pass upgrade_type if available.
+      # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
+      if is_stack_upgrade and "upgrade_type" in inspect.getargspec(self.stop).args:
+        self.stop(env, upgrade_type=upgrade_type)
       else:
-        self.stop(env)
-
-      if rolling_restart:
-        self.pre_rolling_restart(env)
-
-      # To remain backward compatible with older stacks, only pass rolling_restart if True.
-      if rolling_restart:
-        self.start(env, rolling_restart=rolling_restart)
+        self.stop(env, rolling_restart=(restart_type == "rolling_upgrade"))
+
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the pre_upgrade_restart method. Once done. remove the else-block.
+        if "pre_upgrade_restart" in dir(self):
+          self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.pre_rolling_restart(env)
+
+      # To remain backward compatible with older stacks, only pass upgrade_type if available.
+      # TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
+      if is_stack_upgrade and "upgrade_type" in inspect.getargspec(self.start).args:
+        self.start(env, upgrade_type=upgrade_type)
       else:
-        self.start(env)
+        self.start(env, rolling_restart=(restart_type == "rolling_upgrade"))
 
-      if rolling_restart:
-        self.post_rolling_restart(env)
+      if is_stack_upgrade:
+        # Remain backward compatible with the rest of the services that haven't switched to using
+        # the post_upgrade_restart method. Once done. remove the else-block.
+        if "post_upgrade_restart" in dir(self):
+          self.post_upgrade_restart(env, upgrade_type=upgrade_type)
+        else:
+          self.post_rolling_restart(env)
 
     if self.should_expose_component_version("restart"):
       self.save_component_version_to_structured_out()
 
+  # TODO, remove after all services have switched to post_upgrade_restart
   def post_rolling_restart(self, env):
     """
     To be overridden by subclasses
     """
     pass
 
-  def configure(self, env, rolling_restart=False):
+  def configure(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
index a139446..e98f730 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/PreUpgradeCheckResourceProvider.java
@@ -134,8 +134,16 @@ public class PreUpgradeCheckResourceProvider extends ReadOnlyResourceProvider {
 
     for (Map<String, Object> propertyMap: propertyMaps) {
       final String clusterName = propertyMap.get(UPGRADE_CHECK_CLUSTER_NAME_PROPERTY_ID).toString();
-      final UpgradeType upgradeType = propertyMap.containsKey(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID) ?
-          UpgradeType.valueOf(propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID).toString()) : UpgradeType.ROLLING;
+
+      UpgradeType upgradeType = UpgradeType.ROLLING;
+      if (propertyMap.containsKey(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID)) {
+        try {
+          upgradeType = UpgradeType.valueOf(propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID).toString());
+        } catch(Exception e){
+          throw new SystemException(String.format("Property %s has an incorrect value of %s.", UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID, propertyMap.get(UPGRADE_CHECK_UPGRADE_TYPE_PROPERTY_ID)));
+        }
+      }
+
       final Cluster cluster;
 
       try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 82ce49f..78c36f8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -492,8 +492,14 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     String preferredUpgradePackName = (String) requestMap.get(UPGRADE_PACK);
 
     // Default to ROLLING upgrade, but attempt to read from properties.
-    final UpgradeType upgradeType = requestMap.containsKey(UPGRADE_TYPE) ?
-        UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString()) : UpgradeType.ROLLING;
+    UpgradeType upgradeType = UpgradeType.ROLLING;
+    if (requestMap.containsKey(UPGRADE_TYPE)) {
+      try {
+        upgradeType = UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString());
+      } catch(Exception e){
+        throw new AmbariException(String.format("Property %s has an incorrect value of %s.", UPGRADE_TYPE, requestMap.get(UPGRADE_TYPE)));
+      }
+    }
 
     if (null == clusterName) {
       throw new AmbariException(String.format("%s is required", UPGRADE_CLUSTER_NAME));
@@ -526,8 +532,15 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     boolean skipPrereqChecks = Boolean.parseBoolean((String) requestMap.get(UPGRADE_SKIP_PREREQUISITE_CHECKS));
     boolean failOnCheckWarnings = Boolean.parseBoolean((String) requestMap.get(UPGRADE_FAIL_ON_CHECK_WARNINGS));
     String preferredUpgradePack = requestMap.containsKey(UPGRADE_PACK) ? (String) requestMap.get(UPGRADE_PACK) : null;
-    UpgradeType upgradeType = requestMap.containsKey(UPGRADE_TYPE) ?
-        UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString()) : UpgradeType.ROLLING;
+
+    UpgradeType upgradeType = UpgradeType.ROLLING;
+    if (requestMap.containsKey(UPGRADE_TYPE)) {
+      try {
+        upgradeType = UpgradeType.valueOf(requestMap.get(UPGRADE_TYPE).toString());
+      } catch(Exception e){
+        throw new AmbariException(String.format("Property %s has an incorrect value of %s.", UPGRADE_TYPE, requestMap.get(UPGRADE_TYPE)));
+      }
+    }
 
     // Validate there isn't an direction == upgrade/downgrade already in progress.
     List<UpgradeEntity> upgrades = s_upgradeDAO.findUpgrades(cluster.getClusterId());
@@ -1061,6 +1074,23 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     }
   }
 
+  /**
+   * Modify the commandParams by applying additional parameters from the stage.
+   * @param wrapper Stage Wrapper that may contain additional parameters.
+   * @param commandParams Parameters to modify.
+   */
+  private void applyAdditionalParameters(StageWrapper wrapper, Map<String, String> commandParams) {
+    if (wrapper.getParams() != null) {
+      Iterator it = wrapper.getParams().entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<String, String> pair = (Map.Entry) it.next();
+        if (!commandParams.containsKey(pair.getKey())) {
+          commandParams.put(pair.getKey(), pair.getValue());
+        }
+      }
+    }
+  }
+
   private void makeActionStage(UpgradeContext context, RequestStageContainer request,
       UpgradeItemEntity entity, StageWrapper wrapper, boolean skippable, boolean allowRetry)
           throws AmbariException {
@@ -1084,6 +1114,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     params.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     params.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, params);
+
     // Because custom task may end up calling a script/function inside a
     // service, it is necessary to set the
     // service_package_folder and hooks_folder params.
@@ -1192,6 +1225,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, commandParams);
+
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         function, filters, commandParams);
     actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
@@ -1244,6 +1280,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Apply additional parameters to the command that come from the stage.
+    applyAdditionalParameters(wrapper, commandParams);
+
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         "SERVICE_CHECK", filters, commandParams);
 
@@ -1304,6 +1343,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     commandParams.put(COMMAND_PARAM_TARGET_STACK, context.getTargetStackId().getStackId());
     commandParams.put(COMMAND_DOWNGRADE_FROM_VERSION, context.getDowngradeFromVersion());
 
+    // Notice that this does not apply any params because the input does not specify a stage.
+    // All of the other actions do use additional params.
+
     String itemDetail = entity.getText();
     String stageText = StringUtils.abbreviate(entity.getText(), 255);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
index 55fb12b..22447d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
@@ -222,6 +222,29 @@ public class MasterHostResolver {
     }
   }
 
+  /**
+   * Determine if HDFS is present and it has NameNode High Availability.
+   * @return true if has NameNode HA, otherwise, false.
+   */
+  public boolean isNameNodeHA() throws AmbariException {
+    Map<String, org.apache.ambari.server.state.Service> services = m_cluster.getServices();
+    if (services != null && services.containsKey("HDFS")) {
+
+      Set<String> secondaryNameNodeHosts = m_cluster.getHosts("HDFS", "SECONDARY_NAMENODE");
+      Set<String> nameNodeHosts = m_cluster.getHosts("HDFS", "NAMENODE");
+
+      if (secondaryNameNodeHosts.size() == 1 && nameNodeHosts.size() == 1) {
+        return false;
+      }
+      if (nameNodeHosts.size() > 1) {
+        return true;
+      }
+
+      throw new AmbariException("Unable to determine if cluster has NameNode HA.");
+    }
+    return false;
+  }
+
 
   /**
    * Get mapping of the HDFS Namenodes from the state ("active" or "standby") to the hostname.

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
index b81d3fd..fd92d21 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.state;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -325,12 +326,15 @@ public class UpgradeHelper {
         }
 
         for (String component : service.components) {
+          // Rolling Upgrade has exactly one task for a Component.
           if (upgradePack.getType() == UpgradeType.ROLLING && !allTasks.get(service.serviceName).containsKey(component)) {
             continue;
           }
+
+          // NonRolling Upgrade has several tasks for the same component, since it must first call Stop, perform several
+          // other tasks, and then Start on that Component.
           
           HostsType hostsType = mhr.getMasterAndHosts(service.serviceName, component);
-          // TODO AMBARI-12698, how does this impact SECONDARY NAMENODE if there's no NameNode HA?
           if (null == hostsType) {
             continue;
           }
@@ -368,26 +372,66 @@ public class UpgradeHelper {
 
           setDisplayNames(context, service.serviceName, component);
 
-          // Special case for NAMENODE
+          // Special case for NAMENODE when there are multiple
           if (service.serviceName.equalsIgnoreCase("HDFS") && component.equalsIgnoreCase("NAMENODE")) {
-            // !!! revisit if needed
-            if (!hostsType.hosts.isEmpty() && hostsType.master != null && hostsType.secondary != null) {
-              // The order is important, first do the standby, then the active namenode.
-              LinkedHashSet<String> order = new LinkedHashSet<>();
-
-              order.add(hostsType.secondary);
-              order.add(hostsType.master);
 
-              // Override the hosts with the ordered collection
-              hostsType.hosts = order;
+            // Rolling Upgrade requires first upgrading the Standby, then the Active NameNode.
+            // Whereas NonRolling needs to do the following:
+            //   NameNode HA:  Pick one to the be active, and the other the standby.
+            //   Non-NameNode HA: Upgrade first the SECONDARY, then the primary NAMENODE
+            switch (upgradePack.getType()) {
+              case ROLLING:
+                if (!hostsType.hosts.isEmpty() && hostsType.master != null && hostsType.secondary != null) {
+                  // The order is important, first do the standby, then the active namenode.
+                  LinkedHashSet<String> order = new LinkedHashSet<String>();
+
+                  order.add(hostsType.secondary);
+                  order.add(hostsType.master);
+
+                  // Override the hosts with the ordered collection
+                  hostsType.hosts = order;
+
+                  builder.add(context, hostsType, service.serviceName,
+                      svc.isClientOnlyService(), pc, null);
+                }
+                break;
+              case NON_ROLLING:
+                boolean isNameNodeHA = mhr.isNameNodeHA();
+                if (isNameNodeHA && hostsType.master != null && hostsType.secondary != null) {
+                  // This could be any order, but the NameNodes have to know what role they are going to take.
+                  // So need to make 2 stages, and add different parameters to each one.
+
+                  HostsType ht1 = new HostsType();
+                  LinkedHashSet<String> h1Hosts = new LinkedHashSet<String>();
+                  h1Hosts.add(hostsType.master);
+                  ht1.hosts = h1Hosts;
+                  Map<String, String> h1Params = new HashMap<String, String>();
+                  h1Params.put("desired_namenode_role", "active");
+
+                  HostsType ht2 = new HostsType();
+                  LinkedHashSet<String> h2Hosts = new LinkedHashSet<String>();
+                  h2Hosts.add(hostsType.secondary);
+                  ht2.hosts = h2Hosts;
+                  Map<String, String> h2Params = new HashMap<String, String>();
+                  h2Params.put("desired_namenode_role", "standby");
+
+
+                  builder.add(context, ht1, service.serviceName,
+                      svc.isClientOnlyService(), pc, h1Params);
+
+                  builder.add(context, ht2, service.serviceName,
+                      svc.isClientOnlyService(), pc, h2Params);
+                } else {
+                  // If no NameNode HA, then don't need to change hostsType.hosts since there should be exactly one.
+                  builder.add(context, hostsType, service.serviceName,
+                      svc.isClientOnlyService(), pc, null);
+                }
+
+                break;
             }
-
-            builder.add(context, hostsType, service.serviceName,
-                svc.isClientOnlyService(), pc);
-
           } else {
             builder.add(context, hostsType, service.serviceName,
-                svc.isClientOnlyService(), pc);
+                svc.isClientOnlyService(), pc, null);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
index ba44408..0e9d2c8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
@@ -108,7 +108,7 @@ public class ClusterGrouping extends Grouping {
 
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
       // !!! no-op in this case
     }
 
@@ -261,7 +261,8 @@ public class ClusterGrouping extends Grouping {
       }
 
       return new StageWrapper(
-          StageWrapper.Type.RU_TASKS, execution.title,
+          StageWrapper.Type.RU_TASKS,
+          execution.title,
           new TaskWrapper(service, component, hostNames, et));
     }
     return null;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
index 2aef43c..11e9267 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
@@ -81,7 +81,7 @@ public class ColocatedGrouping extends Grouping {
 
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
 
       boolean forUpgrade = ctx.getDirection().isUpgrade();
 
@@ -110,7 +110,7 @@ public class ColocatedGrouping extends Grouping {
           proxy.clientOnly = clientOnly;
           proxy.message = getStageText("Preparing",
               ctx.getComponentDisplay(service, pc.name), Collections.singleton(host));
-          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks, params));
           proxy.service = service;
           proxy.component = pc.name;
           targetList.add(proxy);
@@ -122,7 +122,7 @@ public class ColocatedGrouping extends Grouping {
           if (RestartTask.class.isInstance(t)) {
             proxy = new TaskProxy();
             proxy.clientOnly = clientOnly;
-            proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), t));
+            proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), params, t));
             proxy.restart = true;
             proxy.service = service;
             proxy.component = pc.name;
@@ -139,7 +139,7 @@ public class ColocatedGrouping extends Grouping {
           proxy.clientOnly = clientOnly;
           proxy.component = pc.name;
           proxy.service = service;
-          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, tasks, params));
           proxy.message = getStageText("Completing",
               ctx.getComponentDisplay(service, pc.name), Collections.singleton(host));
           targetList.add(proxy);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
index d6db9b1..cd3ee68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import javax.xml.bind.annotation.XmlAttribute;
@@ -84,24 +85,26 @@ public class Grouping {
      * @param hostsType the order collection of hosts, which may have a master and secondary
      * @param service the service name
      * @param pc the ProcessingComponent derived from the upgrade pack.
+     * @param params additional parameters
      */
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-       boolean clientOnly, ProcessingComponent pc) {
+       boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
 
       boolean forUpgrade = ctx.getDirection().isUpgrade();
 
       // Construct the pre tasks during Upgrade/Downgrade direction.
       List<TaskBucket> buckets = buckets(resolveTasks(forUpgrade, true, pc));
       for (TaskBucket bucket : buckets) {
-        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks);
+        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         Set<String> preTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(preTasks);
         if (!preTasksEffectiveHosts.isEmpty()) {
           StageWrapper stage = new StageWrapper(
               bucket.type,
               getStageText("Preparing", ctx.getComponentDisplay(service, pc.name), preTasksEffectiveHosts),
+              params,
               preTasks
-              );
+          );
           m_stages.add(stage);
         }
       }
@@ -114,7 +117,8 @@ public class Grouping {
           StageWrapper stage = new StageWrapper(
               t.getStageWrapperType(),
               getStageText(t.getActionVerb(), ctx.getComponentDisplay(service, pc.name), Collections.singleton(hostName)),
-              new TaskWrapper(service, pc.name, Collections.singleton(hostName), t));
+              params,
+              new TaskWrapper(service, pc.name, Collections.singleton(hostName), params, t));
           m_stages.add(stage);
         }
       }
@@ -122,12 +126,13 @@ public class Grouping {
       // Construct the post tasks during Upgrade/Downgrade direction.
       buckets = buckets(resolveTasks(forUpgrade, false, pc));
       for (TaskBucket bucket : buckets) {
-        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks);
+        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         Set<String> postTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(postTasks);
         if (!postTasksEffectiveHosts.isEmpty()) {
           StageWrapper stage = new StageWrapper(
               bucket.type,
               getStageText("Completing", ctx.getComponentDisplay(service, pc.name), postTasksEffectiveHosts),
+              params,
               postTasks
               );
           m_stages.add(stage);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
index fec9978..0033185 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java
@@ -103,7 +103,7 @@ public class ServiceCheckGrouping extends Grouping {
      */
     @Override
     public void add(UpgradeContext ctx, HostsType hostsType, String service,
-        boolean clientOnly, ProcessingComponent pc) {
+        boolean clientOnly, ProcessingComponent pc, Map<String, String> params) {
       // !!! nothing to do here
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
index 92df3b5..2ea3671 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
@@ -19,8 +19,10 @@ package org.apache.ambari.server.state.stack.upgrade;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.gson.Gson;
@@ -33,16 +35,41 @@ public class StageWrapper {
   private static Gson gson = new Gson();
   private String text;
   private Type type;
-
+  private Map<String, String> params;
   private List<TaskWrapper> tasks;
 
+  /**
+   * Wrapper for a stage that encapsulates its text and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param tasks List of tasks to add to the stage
+   */
   public StageWrapper(Type type, String text, TaskWrapper... tasks) {
-    this(type, text, Arrays.asList(tasks));
+    this(type, text, null, Arrays.asList(tasks));
+  }
+
+  /**
+   * Wrapper for a stage that encapsulates its text, params, and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param params Command parameters
+   * @param tasks List of tasks to add to the stage
+   */
+  public StageWrapper(Type type, String text, Map<String, String> params, TaskWrapper... tasks) {
+    this(type, text, params, Arrays.asList(tasks));
   }
 
-  public StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
+  /**
+   * Wrapper for a stage that encapsulates its text, params, and tasks.
+   * @param type Type of stage
+   * @param text Text to display
+   * @param params Command parameters
+   * @param tasks List of tasks to add to the stage
+   */
+  public StageWrapper(Type type, String text, Map<String, String> params, List<TaskWrapper> tasks) {
     this.type = type;
     this.text = text;
+    this.params = (params == null ? Collections.<String, String>emptyMap() : params);
     this.tasks = tasks;
   }
 
@@ -78,6 +105,13 @@ public class StageWrapper {
   }
 
   /**
+   * @return the additional command parameters
+   */
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  /**
    * @return the wrapped tasks for this stage
    */
   public List<TaskWrapper> getTasks() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
index d4ee9a8..6ef0980 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.ambari.server.serveraction.upgrades.AutoSkipFailedSummaryAction;
@@ -66,9 +67,11 @@ public abstract class StageWrapperBuilder {
    *          whether the service is client only, no service checks
    * @param pc
    *          the ProcessingComponent derived from the upgrade pack
+   * @param params
+   *          additional parameters
    */
   public abstract void add(UpgradeContext upgradeContext, HostsType hostsType, String service,
-      boolean clientOnly, ProcessingComponent pc);
+      boolean clientOnly, ProcessingComponent pc, Map<String, String> params);
 
   /**
    * Builds the stage wrappers, including any pre- and post-procesing that needs

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
index f7cc930..69b3f8b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
@@ -18,7 +18,9 @@
 package org.apache.ambari.server.state.stack.upgrade;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -29,6 +31,7 @@ public class TaskWrapper {
   private String service;
   private String component;
   private Set<String> hosts; // all the hosts that all the tasks must run
+  private Map<String, String> params;
   private List<Task> tasks; // all the tasks defined for the hostcomponent
 
   /**
@@ -38,8 +41,20 @@ public class TaskWrapper {
    * @param tasks an array of tasks as a convenience
    */
   public TaskWrapper(String s, String c, Set<String> hosts, Task... tasks) {
-    this(s, c, hosts, Arrays.asList(tasks));
+    this(s, c, hosts, null, Arrays.asList(tasks));
   }
+  
+  /**
+   * @param s the service name for the tasks
+   * @param c the component name for the tasks
+   * @param hosts the set of hosts that the tasks are for
+   * @param params additional command parameters
+   * @param tasks an array of tasks as a convenience
+   */
+  public TaskWrapper(String s, String c, Set<String> hosts, Map<String, String> params, Task... tasks) {
+    this(s, c, hosts, params, Arrays.asList(tasks));
+  }
+
 
   /**
    * @param s the service name for the tasks
@@ -47,15 +62,23 @@ public class TaskWrapper {
    * @param hosts the set of hosts for the
    * @param tasks the list of tasks
    */
-  public TaskWrapper(String s, String c, Set<String> hosts, List<Task> tasks) {
+  public TaskWrapper(String s, String c, Set<String> hosts, Map<String, String> params, List<Task> tasks) {
     service = s;
     component = c;
 
     this.hosts = hosts;
+    this.params = (params == null) ? new HashMap<String, String>() : params;
     this.tasks = tasks;
   }
 
   /**
+   * @return the additional command parameters.
+   */
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  /**
    * @return the tasks associated with this wrapper
    */
   public List<Task> getTasks() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
index a5813e3..057c310 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.ambari.server.stack.HostsType;
@@ -43,15 +44,16 @@ public class TaskWrapperBuilder {
    * @param component the component name for the tasks
    * @param hostsType the collection of sets along with their status
    * @param tasks collection of tasks
+   * @param params additional parameters
    */
-  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks) {
+  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks, Map<String, String> params) {
     List<TaskWrapper> collection = new ArrayList<TaskWrapper>();
     for (Task t : tasks) {
       if (t.getType().equals(Task.Type.EXECUTE)) {
         ExecuteTask et = (ExecuteTask) t;
         if (et.hosts == ExecuteHostType.MASTER) {
           if (hostsType.master != null) {
-            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), t));
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), params, t));
             continue;
           } else {
             LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a master but could not find any masters to run on. Skipping this task.", service, component));
@@ -61,7 +63,7 @@ public class TaskWrapperBuilder {
         // Pick a random host.
         if (et.hosts == ExecuteHostType.ANY) {
           if (hostsType.hosts != null && !hostsType.hosts.isEmpty()) {
-            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.hosts.iterator().next()), t));
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.hosts.iterator().next()), params, t));
             continue;
           } else {
             LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a any host but could not find host to run on. Skipping this task.", service, component));
@@ -70,7 +72,7 @@ public class TaskWrapperBuilder {
         }
       }
 
-      collection.add(new TaskWrapper(service, component, hostsType.hosts, t));
+      collection.add(new TaskWrapper(service, component, hostsType.hosts, params, t));
     }
 
     return collection;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
index fa68435..1d242e1 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
@@ -27,8 +27,23 @@ from resource_management.libraries.functions.security_commons import build_expec
 from hdfs import hdfs
 from ambari_commons.os_family_impl import OsFamilyImpl
 from ambari_commons import OSConst
+from utils import get_hdfs_binary
 
 class DataNode(Script):
+
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-datanode"}
+
+  def get_hdfs_binary(self):
+    """
+    Get the name or path to the hdfs binary depending on the stack and version.
+    """
+    import params
+    stack_to_comp = self.get_stack_to_component()
+    if params.stack_name in stack_to_comp:
+      return get_hdfs_binary(stack_to_comp[params.stack_name])
+    return "hdfs"
+
   def install(self, env):
     import params
     self.install_packages(env, params.exclude_packages)
@@ -40,19 +55,20 @@ class DataNode(Script):
     hdfs("datanode")
     datanode(action="configure")
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
     datanode(action="start")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     # pre-upgrade steps shutdown the datanode, so there's no need to call
-    # action=stop
-    if rolling_restart:
-      stopped = datanode_upgrade.pre_upgrade_shutdown()
+
+    hdfs_binary = self.get_hdfs_binary()
+    if upgrade_type == "rolling":
+      stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary)
       if not stopped:
         datanode(action="stop")
     else:
@@ -67,23 +83,21 @@ class DataNode(Script):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class DataNodeDefault(DataNode):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-datanode"}
-
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing DataNode Stack Upgrade pre-restart")
     import params
     env.set_params(params)
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-datanode", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing DataNode Stack Upgrade post-restart")
     import params
     env.set_params(params)
+    hdfs_binary = self.get_hdfs_binary()
     # ensure the DataNode has started and rejoined the cluster
-    datanode_upgrade.post_upgrade_check()
+    datanode_upgrade.post_upgrade_check(hdfs_binary)
 
   def security_status(self, env):
     import status_params

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
index 2e5ac19..6138f8c 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode_upgrade.py
@@ -26,12 +26,13 @@ from resource_management.libraries.functions import format
 from resource_management.libraries.functions.decorator import retry
 
 
-def pre_upgrade_shutdown():
+def pre_rolling_upgrade_shutdown(hdfs_binary):
   """
   Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the
   DataNode in preparation for an upgrade. This will then periodically check
   "getDatanodeInfo" to ensure the DataNode has shutdown correctly.
   This function will obtain the Kerberos ticket if security is enabled.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully.
   """
   import params
@@ -40,38 +41,39 @@ def pre_upgrade_shutdown():
   if params.security_enabled:
     Execute(params.dn_kinit_cmd, user = params.hdfs_user)
 
-  command = format('hdfs dfsadmin -shutdownDatanode {dfs_dn_ipc_address} upgrade')
+  command = format('{hdfs_binary} dfsadmin -shutdownDatanode {dfs_dn_ipc_address} upgrade')
 
   code, output = shell.call(command, user=params.hdfs_user)
   if code == 0:
     # verify that the datanode is down
-    _check_datanode_shutdown()
+    _check_datanode_shutdown(hdfs_binary)
   else:
-    # Due to bug HDFS-7533, DataNode may not always shutdown during rolling upgrade, and it is necessary to kill it.
+    # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it.
     if output is not None and re.search("Shutdown already in progress", output):
       Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command))
       return False
   return True
 
 
-def post_upgrade_check():
+def post_upgrade_check(hdfs_binary):
   """
   Verifies that the DataNode has rejoined the cluster. This function will
   obtain the Kerberos ticket if security is enabled.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...")
   if params.security_enabled:
-    Execute(params.dn_kinit_cmd,user = params.hdfs_user)
+    Execute(params.dn_kinit_cmd, user=params.hdfs_user)
 
   # verify that the datanode has started and rejoined the HDFS cluster
-  _check_datanode_startup()
+  _check_datanode_startup(hdfs_binary)
 
 
 @retry(times=24, sleep_time=5, err_class=Fail)
-def _check_datanode_shutdown():
+def _check_datanode_shutdown(hdfs_binary):
   """
   Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
   several times, pausing in between runs. Once the DataNode stops responding
@@ -84,13 +86,14 @@ def _check_datanode_shutdown():
   https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
   times for ipc.client.connect.retry.interval. In the meantime, override them
   here, but only for RU.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   # override stock retry timeouts since after 30 seconds, the datanode is
   # marked as dead and can affect HBase during RU
-  command = format('hdfs dfsadmin -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
+  command = format('{hdfs_binary} dfsadmin -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
 
   try:
     Execute(command, user=params.hdfs_user, tries=1)
@@ -103,19 +106,19 @@ def _check_datanode_shutdown():
 
 
 @retry(times=12, sleep_time=10, err_class=Fail)
-def _check_datanode_startup():
+def _check_datanode_startup(hdfs_binary):
   """
   Checks that a DataNode is reported as being alive via the
   "hdfs dfsadmin -report -live" command. Once the DataNode is found to be
   alive this method will return, otherwise it will raise a Fail(...) and retry
   automatically.
+  :param hdfs_binary: name/path of the HDFS binary to use
   :return:
   """
   import params
 
   try:
-    # 'su - hdfs -c "hdfs dfsadmin -report -live"'
-    command = 'hdfs dfsadmin -report -live'
+    command = format('{hdfs_binary} dfsadmin -report -live')
     return_code, hdfs_output = shell.call(command, user=params.hdfs_user)
   except:
     raise Fail('Unable to determine if the DataNode has started after upgrade.')

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
index dd0dca4..16218b6 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
@@ -40,11 +40,11 @@ class HdfsClient(Script):
     env.set_params(params)
     hdfs()
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
@@ -57,7 +57,7 @@ class HdfsClientDefault(HdfsClient):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-client"}
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
index b11d7ea..98b8afd 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
@@ -36,18 +36,27 @@ from resource_management.core.shell import as_user
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 
-from utils import service, safe_zkfc_op
+from utils import service, safe_zkfc_op, is_previous_fs_image
 from setup_ranger_hdfs import setup_ranger_hdfs
+from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
+
 
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def namenode(action=None, do_format=True, rolling_restart=False, env=None):
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, env=None):
+  if action is None:
+    raise Fail('"action" parameter is required for function namenode().')
+
+  if action in ["start", "stop"] and hdfs_binary is None:
+    raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
   if action == "configure":
     import params
     #we need this directory to be present before any action(HA manual steps for
     #additional namenode)
     create_name_dirs(params.dfs_name_dir)
   elif action == "start":
-    setup_ranger_hdfs(rolling_upgrade = rolling_restart)
+    Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type)))
+    setup_ranger_hdfs(upgrade_type=upgrade_type)
     import params
     if do_format:
       format_namenode()
@@ -70,13 +79,33 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
         if not success:
           raise Fail("Could not bootstrap standby namenode")
 
-    options = "-rollingUpgrade started" if rolling_restart else ""
-
-    if rolling_restart:
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
       # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried
       # to kill ZKFC manually, so we need to start it if not already running.
       safe_zkfc_op(action, env)
 
+    options = ""
+    if upgrade_type == "rolling":
+      options = "-rollingUpgrade started"
+    elif upgrade_type == "nonrolling":
+      is_previous_image_dir = is_previous_fs_image()
+      Logger.info(format("Previous file system image dir present is {is_previous_image_dir}"))
+
+      if params.dfs_ha_enabled:
+        if params.desired_namenode_role is None:
+          raise Fail("Did not receive parameter \"desired_namenode_role\" to indicate the role that this NameNode should have.")
+
+        if params.desired_namenode_role == "active":
+          # The "-upgrade" command can only be used exactly once. If used more than once during a retry, it will cause problems.
+          options = "" if is_previous_image_dir else "-upgrade"
+
+        if params.desired_namenode_role == "standby":
+          options = "-bootstrapStandby -force"
+      else:
+        # Both Primary and Secondary NameNode can use the same command.
+        options = "" if is_previous_image_dir else "-upgrade"
+    Logger.info(format("Option for start command: {options}"))
+
     service(
       action="start",
       name="namenode",
@@ -90,53 +119,66 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
       Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
               user = params.hdfs_user)
 
-    is_namenode_safe_mode_off = format("hdfs dfsadmin -fs {namenode_address} -safemode get | grep 'Safe mode is OFF'")
+    is_namenode_safe_mode_off = format("{hdfs_binary} dfsadmin -fs {namenode_address} -safemode get | grep 'Safe mode is OFF'")
     if params.dfs_ha_enabled:
-      is_active_namenode_cmd = as_user(format("hdfs --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+      is_active_namenode_cmd = as_user(format("{hdfs_binary} --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
     else:
       is_active_namenode_cmd = None
+    
+    # During NonRolling Upgrade, both NameNodes are initially down,
+    # so no point in checking if this is the active or standby.
+    if upgrade_type == "nonrolling":
+      is_active_namenode_cmd = None
 
-    # During normal operations, if HA is enabled and it is in standby, then no need to check safemode staus.
-    # During Rolling Upgrade, both namenodes must eventually leave safemode, and Ambari can wait for this.
+    # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____|
+    # no-HA                 | ON -> OFF                | Yes                      |
+    # HA and active         | ON -> OFF                | Yes                      |
+    # HA and standby        | no change                | no check                 |
+    # RU with HA on active  | ON -> OFF                | Yes                      |
+    # RU with HA on standby | ON -> OFF                | Yes                      |
+    # EU with HA on active  | no change                | no check                 |
+    # EU with HA on standby | no change                | no check                 |
+    # EU non-HA             | no change                | no check                 |
 
-    # ___Scenario_________|_Expected safemode state___|_Wait for safemode OFF____|
-    # 1 (HA and active)   | ON -> OFF                 | Yes                      |
-    # 2 (HA and standby)  | no change (yes during RU) | no check (yes during RU) |
-    # 3 (no-HA)           | ON -> OFF                 | Yes                      |
     check_for_safemode_off = False
     msg = ""
     if params.dfs_ha_enabled:
-      code, out = shell.call(is_active_namenode_cmd, logoutput=True) # If active NN, code will be 0
-      if code == 0: # active
-        check_for_safemode_off = True
-        msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
-      elif rolling_restart:
+      if upgrade_type is not None:
         check_for_safemode_off = True
-        msg = "Must wait to leave safemode since High Availability is enabled during a Rolling Upgrade"
+        msg = "Must wait to leave safemode since High Availability is enabled during a Stack Upgrade"
+      else:
+        # During normal operations, the NameNode is expected to be up.
+        code, out = shell.call(is_active_namenode_cmd, logoutput=True) # If active NN, code will be 0
+        if code == 0: # active
+          check_for_safemode_off = True
+          msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
+        else:
+          msg = "Will remain in the current safemode state."
     else:
       msg = "Must wait to leave safemode since High Availability is not enabled."
       check_for_safemode_off = True
 
-    if not msg:
-      msg = "Will remain in the current safemode state."
     Logger.info(msg)
 
+    # During a NonRolling (aka Express Upgrade), stay in safemode since the DataNodes are down.
+    stay_in_safe_mode = False
+    if upgrade_type == "nonrolling":
+      stay_in_safe_mode = True
+
     if check_for_safemode_off:
-      # First check if Namenode is not in 'safemode OFF' (equivalent to safemode ON). If safemode is OFF, no change.
-      # If safemode is ON, first wait for NameNode to leave safemode on its own (if that doesn't happen within 30 seconds, then
-      # force NameNode to leave safemode).
-      Logger.info("Checking the NameNode safemode status since may need to transition from ON to OFF.")
-
-      try:
-        # Wait up to 30 mins
-        Execute(is_namenode_safe_mode_off,
-                tries=180,
-                try_sleep=10,
-                user=params.hdfs_user,
-                logoutput=True
-        )
-      except Fail:
-        Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
+      Logger.info("Stay in safe mode: {0}".format(stay_in_safe_mode))
+      if not stay_in_safe_mode:
+        Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+        try:
+          # Wait up to 30 mins
+          Execute(is_namenode_safe_mode_off,
+                  tries=180,
+                  try_sleep=10,
+                  user=params.hdfs_user,
+                  logoutput=True
+          )
+        except Fail:
+          Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
 
     # Always run this on non-HA, or active NameNode during HA.
     create_hdfs_directories(is_active_namenode_cmd)
@@ -154,7 +196,13 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
     decommission()
 
 @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
-def namenode(action=None, do_format=True, rolling_restart=False, env=None):
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, env=None):
+  if action is None:
+    raise Fail('"action" parameter is required for function namenode().')
+
+  if action in ["start", "stop"] and hdfs_binary is None:
+    raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
   if action == "configure":
     pass
   elif action == "start":

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
index 46c7272..2ef1b69 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
@@ -45,8 +45,8 @@ class JournalNodeDefault(JournalNode):
   def get_stack_to_component(self):
     return {"HDP": "hadoop-hdfs-journalnode"}
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -54,7 +54,7 @@ class JournalNodeDefault(JournalNode):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-journalnode", params.version)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)
@@ -65,13 +65,16 @@ class JournalNodeDefault(JournalNode):
       create_log_dir=True
     )
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    if upgrade_type == "nonrolling":
+      return
+
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
     journalnode_upgrade.post_upgrade_check()
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
 
     env.set_params(params)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
index e2ebbcb..850c32d 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py
@@ -31,7 +31,7 @@ from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
 
 def post_upgrade_check():
   """
-  Ensure all journal nodes are up and quorum is established
+  Ensure all journal nodes are up and quorum is established during Rolling Upgrade.
   :return:
   """
   import params

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
index 93bbc0f..bb60ec3 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
@@ -42,11 +42,13 @@ from resource_management.core.logger import Logger
 from ambari_commons.os_family_impl import OsFamilyImpl
 from ambari_commons import OSConst
 
+
 import namenode_upgrade
 from hdfs_namenode import namenode
 from hdfs import hdfs
 import hdfs_rebalance
-from utils import initiate_safe_zkfc_failover
+from utils import initiate_safe_zkfc_failover, get_hdfs_binary
+
 
 
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
@@ -62,6 +64,19 @@ except ImportError:
 
 class NameNode(Script):
 
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-namenode"}
+
+  def get_hdfs_binary(self):
+    """
+    Get the name or path to the hdfs binary depending on the stack and version.
+    """
+    import params
+    stack_to_comp = self.get_stack_to_component()
+    if params.stack_name in stack_to_comp:
+      return get_hdfs_binary(stack_to_comp[params.stack_name])
+    return "hdfs"
+
   def install(self, env):
     import params
     self.install_packages(env, params.exclude_packages)
@@ -73,40 +88,41 @@ class NameNode(Script):
     import params
     env.set_params(params)
     hdfs("namenode")
-    namenode(action="configure", env=env)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="configure", hdfs_binary=hdfs_binary, env=env)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
     self.configure(env)
-    namenode(action="start", rolling_restart=rolling_restart, env=env)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)
-    if rolling_restart and params.dfs_ha_enabled:
+    hdfs_binary = self.get_hdfs_binary()
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
       if params.dfs_ha_automatic_failover_enabled:
         initiate_safe_zkfc_failover()
       else:
         raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
-    namenode(action="stop", rolling_restart=rolling_restart, env=env)
+    namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
 
   def status(self, env):
     import status_params
     env.set_params(status_params)
-    namenode(action="status", rolling_restart=False, env=env)
+    namenode(action="status", env=env)
 
   def decommission(self, env):
     import params
     env.set_params(params)
-    namenode(action="decommission")
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="decommission", hdfs_binary=hdfs_binary)
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class NameNodeDefault(NameNode):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-namenode"}
-
   def restore_snapshot(self, env):
     """
     Restore the snapshot during a Downgrade.
@@ -115,21 +131,73 @@ class NameNodeDefault(NameNode):
     pass
 
   def prepare_non_rolling_upgrade(self, env):
-    print "TODO AMBARI-12698"
-    pass
+    """
+    If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
+    make sure that there is no "/previous" directory.
+
+    Create a list of all the DataNodes in the cluster.
+    hdfs dfsadmin -report > dfs-old-report-1.log
+
+    hdfs dfsadmin -safemode enter
+    hdfs dfsadmin -saveNamespace
+
+    Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
+
+    Store the layoutVersion for the NameNode located at ${dfs.namenode.name.dir}/current/VERSION, into a backup directory
+
+    Finalize any prior HDFS upgrade,
+    hdfs dfsadmin -finalizeUpgrade
+    """
+    import params
+    Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
+
+    if params.security_enabled:
+      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+              user=params.hdfs_user)
+
+    hdfs_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_upgrade_check_for_previous_dir()
+    namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_backup_namenode_dir()
+    namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary)
 
   def prepare_rolling_upgrade(self, env):
-    namenode_upgrade.prepare_rolling_upgrade()
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_rolling_upgrade(hfds_binary)
+
+  def wait_for_safemode_off(self, env):
+    """
+    During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
+    all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
+    """
+    import params
+
+    Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+    try:
+      hdfs_binary = self.get_hdfs_binary()
+      # Note, this fails if namenode_address isn't prefixed with "params."
+      is_namenode_safe_mode_off = format("{hdfs_binary} dfsadmin -fs {params.namenode_address} -safemode get | grep 'Safe mode is OFF'")
+      # Wait up to 30 mins
+      Execute(is_namenode_safe_mode_off,
+              tries=180,
+              try_sleep=10,
+              user=params.hdfs_user,
+              logoutput=True
+      )
+    except Fail:
+      Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
 
   def finalize_non_rolling_upgrade(self, env):
-    print "TODO AMBARI-12698"
-    pass
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary)
 
   def finalize_rolling_upgrade(self, env):
-    namenode_upgrade.finalize_rolling_upgrade()
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("rolling", hfds_binary)
 
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
     import params
     env.set_params(params)
 
@@ -137,12 +205,13 @@ class NameNodeDefault(NameNode):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-namenode", params.version)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade post-restart")
     import params
     env.set_params(params)
 
-    Execute("hdfs dfsadmin -report -live",
+    hdfs_binary = self.get_hdfs_binary()
+    Execute(format("{hdfs_binary} dfsadmin -report -live"),
             user=params.hdfs_user
     )
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
index e8c142c..d6b6225 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_ha_state.py
@@ -180,6 +180,33 @@ class NamenodeHAState:
       return self.get_address_for_host(hostname)
     return None
 
+  def is_active(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the active NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE)
+
+  def is_standby(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the standby NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.STANDBY)
+
+  def _is_in_state(self, host_name, state):
+    """
+    :param host_name: Host name
+    :param state: State to check
+    :return: Return True if this NameNode is in the specified state, otherwise, False.
+    """
+    mapping = self.get_namenode_state_to_hostnames()
+    if state in mapping:
+      hosts_in_state = mapping[state]
+      if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower():
+        return True
+    return False
+
   def is_healthy(self):
     """
     :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist.

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
index fb39878..c8c057d 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py
@@ -17,26 +17,148 @@ limitations under the License.
 
 """
 import re
+import os
 
 from resource_management.core.logger import Logger
 from resource_management.core.resources.system import Execute
-from resource_management.libraries.functions.format import format
-from resource_management.libraries.functions.default import default
 from resource_management.core import shell
-from resource_management.libraries.functions import Direction, SafeMode
+from resource_management.core.shell import as_user
 from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.libraries.functions import Direction, SafeMode
+
+from namenode_ha_state import NamenodeHAState
 
 
 safemode_to_instruction = {SafeMode.ON: "enter",
                            SafeMode.OFF: "leave"}
 
-def reach_safemode_state(user, safemode_state, in_ha):
+
+def prepare_upgrade_check_for_previous_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data.
+  Check that there is no "previous" folder inside the NameNode Name Dir.
+  """
+  import params
+
+  if params.dfs_ha_enabled:
+    namenode_ha = NamenodeHAState()
+    if namenode_ha.is_active(params.hostname):
+      Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
+
+      problematic_previous_namenode_dirs = set()
+      nn_name_dirs = params.dfs_name_dir.split(',')
+      for nn_dir in nn_name_dirs:
+        if os.path.isdir(nn_dir):
+          # Check for a previous folder, which is not allowed.
+          previous_dir = os.path.join(nn_dir, "previous")
+          if os.path.isdir(previous_dir):
+            problematic_previous_namenode_dirs.add(previous_dir)
+
+      if len(problematic_previous_namenode_dirs) > 0:
+        message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \
+                  'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \
+                  'NameNode Name Dir(s): {0}\n' \
+                  '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs))
+        Logger.error(message)
+        raise Fail(message)
+
+def prepare_upgrade_enter_safe_mode(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  safe_mode_enter_cmd = format("{hdfs_binary} dfsadmin -safemode enter")
+  safe_mode_enter_and_check_for_on = format("{safe_mode_enter_cmd} | grep 'Safe mode is ON'")
+  try:
+    # Safe to call if already in Safe Mode
+    Logger.info("Enter SafeMode if not already in it.")
+    as_user(safe_mode_enter_and_check_for_on, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+  except Exception, e:
+    message = format("Could not enter safemode. As the HDFS user, call this command: {safe_mode_enter_cmd}")
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_save_namespace(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  save_namespace_cmd = format("{hdfs_binary} dfsadmin -saveNamespace")
+  try:
+    Logger.info("Checkpoint the current namespace.")
+    as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+  except Exception, e:
+    message = format("Could save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}")
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_backup_namenode_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs.
+  """
+  import params
+
+  i = 0
+  failed_paths = []
+  nn_name_dirs = params.dfs_name_dir.split(',')
+  backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted)
+  if len(nn_name_dirs) > 0:
+    Logger.info("Backup the NameNode name directory's CURRENT folder.")
+  for nn_dir in nn_name_dirs:
+    i += 1
+    namenode_current_image = os.path.join(nn_dir, "current")
+    unique = get_unique_id_and_date() + "_" + str(i)
+    # Note that /tmp may not be writeable.
+    backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique)
+
+    if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder):
+      try:
+        os.makedirs(backup_current_folder)
+        Execute(('cp', '-ar', namenode_current_image, backup_current_folder),
+                sudo=True
+        )
+      except Exception, e:
+        failed_paths.append(namenode_current_image)
+  if len(failed_paths) > 0:
+    Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is "
+                 "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir,
+                                                                                           ", ".join(failed_paths)))
+
+def prepare_upgrade_finalize_previous_upgrades(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  finalize_command = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
+  try:
+    Logger.info("Attempt to Finalize if there are any in-progress upgrades. "
+                "This will return 255 if no upgrades are in progress.")
+    code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user)
+    if out:
+      expected_substring = "there is no rolling upgrade in progress"
+      if expected_substring not in out.lower():
+        Logger.warning('Finalize command did not contain substring: %s' % expected_substring)
+    else:
+      Logger.warning("Finalize command did not return any output.")
+  except Exception, e:
+    Logger.warning("Ensure no upgrades are in progress.")
+
+def reach_safemode_state(user, safemode_state, in_ha, hdfs_binary):
   """
   Enter or leave safemode for the Namenode.
-  @param user: user to perform action as
-  @param safemode_state: Desired state of ON or OFF
-  @param in_ha: bool indicating if Namenode High Availability is enabled
-  @:return Returns a tuple of (transition success, original state). If no change is needed, the indicator of
+  :param user: user to perform action as
+  :param safemode_state: Desired state of ON or OFF
+  :param in_ha: bool indicating if Namenode High Availability is enabled
+  :param hdfs_binary: name/path of the HDFS binary to use
+  :return: Returns a tuple of (transition success, original state). If no change is needed, the indicator of
   success will be True
   """
   Logger.info("Prepare to transition into safemode state %s" % safemode_state)
@@ -44,7 +166,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
   original_state = SafeMode.UNKNOWN
 
   hostname = params.hostname
-  safemode_check = format("hdfs dfsadmin -safemode get")
+  safemode_check = format("{hdfs_binary} dfsadmin -safemode get")
 
   grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}")
   safemode_check_with_grep = format("hdfs dfsadmin -safemode get | grep '{grep_pattern}'")
@@ -61,7 +183,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
         return (True, original_state)
       else:
         # Make a transition
-        command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state])
+        command = "{0} dfsadmin -safemode {1}".format(hdfs_binary, safemode_to_instruction[safemode_state])
         Execute(command,
                 user=user,
                 logoutput=True,
@@ -74,7 +196,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
   return (False, original_state)
 
 
-def prepare_rolling_upgrade():
+def prepare_rolling_upgrade(hdfs_binary):
   """
   Perform either an upgrade or a downgrade.
 
@@ -83,6 +205,7 @@ def prepare_rolling_upgrade():
   1. Leave safemode if the safemode status is not OFF
   2. Execute a rolling upgrade "prepare"
   3. Execute a rolling upgrade "query"
+  :param hdfs_binary: name/path of the HDFS binary to use
   """
   import params
 
@@ -96,12 +219,12 @@ def prepare_rolling_upgrade():
 
 
   if params.upgrade_direction == Direction.UPGRADE:
-    safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True)
+    safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True, hdfs_binary)
     if not safemode_transition_successful:
       raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF))
 
-    prepare = "hdfs dfsadmin -rollingUpgrade prepare"
-    query = "hdfs dfsadmin -rollingUpgrade query"
+    prepare = format("{hdfs_binary} dfsadmin -rollingUpgrade prepare")
+    query = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
     Execute(prepare,
             user=params.hdfs_user,
             logoutput=True)
@@ -111,9 +234,11 @@ def prepare_rolling_upgrade():
   elif params.upgrade_direction == Direction.DOWNGRADE:
     pass
 
-def finalize_rolling_upgrade():
+def finalize_upgrade(upgrade_type, hdfs_binary):
   """
   Finalize the Namenode upgrade, at which point it cannot be downgraded.
+  :param upgrade_type rolling or nonrolling
+  :param hdfs_binary: name/path of the HDFS binary to use
   """
   Logger.info("Executing Rolling Upgrade finalize")
   import params
@@ -122,8 +247,15 @@ def finalize_rolling_upgrade():
     kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") 
     Execute(kinit_command, user=params.hdfs_user, logoutput=True)
 
-  finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize"
-  query_cmd = "hdfs dfsadmin -rollingUpgrade query"
+  finalize_cmd = ""
+  query_cmd = ""
+  if upgrade_type == "rolling":
+    finalize_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade finalize")
+    query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
+
+  elif upgrade_type == "nonrolling":
+    finalize_cmd = format("{hdfs_binary} dfsadmin -finalizeUpgrade")
+    query_cmd = format("{hdfs_binary} dfsadmin -rollingUpgrade query")
 
   Execute(query_cmd,
         user=params.hdfs_user,

http://git-wip-us.apache.org/repos/asf/ambari/blob/7afe5a4e/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
index be6f0d5..df5569e 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/nfsgateway.py
@@ -41,7 +41,7 @@ class NFSGateway(Script):
 
     self.install_packages(env, params.exclude_packages)
 
-  def pre_rolling_restart(self, env):
+  def pre_upgrade_restart(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
@@ -49,14 +49,14 @@ class NFSGateway(Script):
       conf_select.select(params.stack_name, "hadoop", params.version)
       hdp_select.select("hadoop-hdfs-nfs3", params.version)
 
-  def start(self, env, rolling_restart=False):
+  def start(self, env, upgrade_type=None):
     import params
     env.set_params(params)
 
     self.configure(env)
     nfsgateway(action="start")
 
-  def stop(self, env, rolling_restart=False):
+  def stop(self, env, upgrade_type=None):
     import params
     env.set_params(params)