You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2016/04/01 02:23:51 UTC

ambari git commit: AMBARI-15637. If RU/EU is paused, services are restarted on the older version. EU is more complex since stopping services should use the original version. (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/trunk b0d6a5781 -> 6fe7f8327


AMBARI-15637. If RU/EU is paused, services are restarted on the older version. EU is more complex since stopping services should use the original version. (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6fe7f832
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6fe7f832
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6fe7f832

Branch: refs/heads/trunk
Commit: 6fe7f83277a269dc6d9634b186ff3fc05fca8505
Parents: b0d6a57
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Thu Mar 24 16:33:12 2016 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Thu Mar 31 17:23:08 2016 -0700

----------------------------------------------------------------------
 .../upgrades/upgrade_test_skip_failures.xml     |   1 +
 .../AmbariCustomCommandExecutionHelper.java     |  25 ++--
 .../AmbariManagementControllerImpl.java         |   9 +-
 .../ClusterStackVersionResourceProvider.java    |   2 +
 .../internal/UpgradeResourceProvider.java       |  34 ++++-
 .../ambari/server/orm/dao/UpgradeDAO.java       |  18 ++-
 .../server/orm/entities/UpgradeEntity.java      |   4 +-
 .../org/apache/ambari/server/state/Cluster.java |   9 +-
 .../server/state/cluster/ClusterImpl.java       | 135 ++++++++++++++++++-
 .../internal/UpgradeResourceProviderTest.java   |  14 +-
 .../ambari/server/orm/dao/UpgradeDAOTest.java   |  35 +++--
 .../upgrades/upgrade_test_skip_failures.xml     |   1 +
 12 files changed, 256 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
----------------------------------------------------------------------
diff --git a/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml b/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
index 597270e..b2c4b93 100644
--- a/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
+++ b/ambari-funtest/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
@@ -19,6 +19,7 @@
   <target>2.2.*</target>
   <skip-failures>true</skip-failures>
   <skip-service-check-failures>true</skip-service-check-failures>
+  <type>ROLLING</type>
   <prerequisite-checks>
     <check>org.apache.ambari.server.checks.HiveMultipleMetastoreCheck</check>
     <check>org.apache.ambari.server.checks.MapReduce2JobHistoryStatePreservingCheck</check>

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index ee7fe7b..f3197cb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -246,6 +246,15 @@ public class AmbariCustomCommandExecutionHelper {
     return sb.toString();
   }
 
+  /**
+   * Called during the start/stop/restart of services, plus custom commands during Stack Upgrade.
+   * @param actionExecutionContext Execution Context
+   * @param resourceFilter Resource Filter
+   * @param stage Command stage
+   * @param additionalCommandParams Additional command params to add the the stage
+   * @param commandDetail String for the command detail
+   * @throws AmbariException
+   */
   private void addCustomCommandAction(final ActionExecutionContext actionExecutionContext,
       final RequestResourceFilter resourceFilter, Stage stage,
       Map<String, String> additionalCommandParams, String commandDetail) throws AmbariException {
@@ -414,15 +423,12 @@ public class AmbariCustomCommandExecutionHelper {
       }
 
       commandParams.put(COMMAND_TIMEOUT, commandTimeout);
-
-      commandParams.put(SERVICE_PACKAGE_FOLDER,
-          serviceInfo.getServicePackageFolder());
-
+      commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
 
-      ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion();
-      if (currentClusterVersion != null) {
-       commandParams.put(KeyNames.VERSION, currentClusterVersion.getRepositoryVersion().getVersion());
+      ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion();
+      if (effectiveClusterVersion != null) {
+       commandParams.put(KeyNames.VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion());
       }
 
       execCmd.setCommandParams(commandParams);
@@ -635,9 +641,7 @@ public class AmbariCustomCommandExecutionHelper {
     }
 
     commandParams.put(COMMAND_TIMEOUT, commandTimeout);
-
-    commandParams.put(SERVICE_PACKAGE_FOLDER,
-        serviceInfo.getServicePackageFolder());
+    commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
     commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
 
     execCmd.setCommandParams(commandParams);
@@ -959,7 +963,6 @@ public class AmbariCustomCommandExecutionHelper {
       } else if (isValidCustomCommand(actionExecutionContext, resourceFilter)) {
 
         String commandDetail = getReadableCustomCommandDetail(actionExecutionContext, resourceFilter);
-
         Map<String, String> extraParams = new HashMap<String, String>();
         String componentName = (null == resourceFilter.getComponentName()) ? null :
             resourceFilter.getComponentName().toLowerCase();

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index e6dd2f7..d1f8232 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -2153,9 +2153,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
         }
         commandParams.put(MAX_DURATION_OF_RETRIES, Integer.toString(retryMaxTime));
         commandParams.put(COMMAND_RETRY_ENABLED, Boolean.toString(retryEnabled));
-        ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion();
-        if (currentClusterVersion != null) {
-         commandParams.put(VERSION, currentClusterVersion.getRepositoryVersion().getVersion());
+
+        ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion();
+        if (effectiveClusterVersion != null) {
+         commandParams.put(VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion());
         }
         if (script.getTimeout() > 0) {
           scriptCommandTimeout = String.valueOf(script.getTimeout());
@@ -3582,7 +3583,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     } else {
       actionExecutionHelper.validateAction(actionRequest);
     }
-
+    // TODO Alejandro, Called First. insert params.version. Called during Rebalance HDFS, ZOOKEEPER Restart, Zookeeper Service Check.
     long requestId = actionManager.getNextRequestId();
     RequestStageContainer requestStageContainer = new RequestStageContainer(
         requestId,

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
index 6f3c03c..bb50820 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
@@ -754,6 +754,8 @@ public class ClusterStackVersionResourceProvider extends AbstractControllerResou
         }
       } else {
         // !!! revisit for PU
+        // If forcing to become CURRENT, get the Cluster Version whose state is CURRENT and make sure that
+        // the Host Version records for the same Repo Version are also marked as CURRENT.
         ClusterVersionEntity current = cluster.getCurrentClusterVersion();
 
         if (!current.getRepositoryVersion().equals(rve)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 714495f..b3bf345 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -78,6 +78,7 @@ import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.security.authorization.AuthorizationException;
+import org.apache.ambari.server.serveraction.upgrades.UpdateDesiredStackAction;
 import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -142,6 +143,18 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
   protected static final String UPGRADE_SKIP_PREREQUISITE_CHECKS = "Upgrade/skip_prerequisite_checks";
   protected static final String UPGRADE_FAIL_ON_CHECK_WARNINGS = "Upgrade/fail_on_check_warnings";
 
+
+  /**
+   * Names that appear in the Upgrade Packs that are used by
+   * {@link org.apache.ambari.server.state.cluster.ClusterImpl#isNonRollingUpgradePastUpgradingStack}
+   * to determine if an upgrade has already changed the version to use.
+   * For this reason, DO NOT CHANGE the name of these since they represent historic values.
+   */
+  public static final String CONST_UPGRADE_GROUP_NAME = "UPDATE_DESIRED_STACK_ID";
+  public static final String CONST_UPGRADE_ITEM_TEXT = "Update Target Stack";
+  public static final String CONST_CUSTOM_COMMAND_NAME = UpdateDesiredStackAction.class.getName();
+
+
   /**
    * Skip slave/client component failures if the tasks are skippable.
    */
@@ -809,6 +822,23 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       throw new AmbariException("There are no groupings available");
     }
 
+    // Non Rolling Upgrades require a group with name "UPDATE_DESIRED_STACK_ID".
+    // This is needed as a marker to indicate which version to use when an upgrade is paused.
+    if (pack.getType() == UpgradeType.NON_ROLLING) {
+      boolean foundGroupWithNameUPDATE_DESIRED_STACK_ID = false;
+      for (UpgradeGroupHolder group : groups) {
+        if (group.name.equalsIgnoreCase(this.CONST_UPGRADE_GROUP_NAME)) {
+          foundGroupWithNameUPDATE_DESIRED_STACK_ID = true;
+          break;
+        }
+      }
+
+      if (foundGroupWithNameUPDATE_DESIRED_STACK_ID == false) {
+        throw new AmbariException(String.format("NonRolling Upgrade Pack %s requires a Group with name %s",
+            pack.getName(), this.CONST_UPGRADE_GROUP_NAME));
+      }
+    }
+
     List<UpgradeGroupEntity> groupEntities = new ArrayList<>();
     RequestStageContainer req = createRequest(direction, version);
 
@@ -1667,7 +1697,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
         // Remove relevant upgrade entity
         try {
           Cluster cluster = clusters.get().getClusterById(clusterId);
-          UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeForCluster(cluster.getClusterId());
+          UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId());
           lastUpgradeItemForCluster.setSuspended(true);
           s_upgradeDAO.merge(lastUpgradeItemForCluster);
 
@@ -1690,7 +1720,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
 
       try {
         Cluster cluster = clusters.get().getClusterById(clusterId);
-        UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeForCluster(cluster.getClusterId());
+        UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId());
         lastUpgradeItemForCluster.setSuspended(false);
         s_upgradeDAO.merge(lastUpgradeItemForCluster);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java
index 4a923be..2d0a4d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java
@@ -181,7 +181,7 @@ public class UpgradeDAO {
   @RequiresSession
   public UpgradeEntity findLastUpgradeForCluster(long clusterId) {
     TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery(
-        "UpgradeEntity.findLatestForCluster", UpgradeEntity.class);
+        "UpgradeEntity.findLatestForClusterInDirection", UpgradeEntity.class);
     query.setMaxResults(1);
     query.setParameter("clusterId", clusterId);
     query.setParameter("direction", Direction.UPGRADE);
@@ -191,6 +191,22 @@ public class UpgradeDAO {
     return daoUtils.selectSingle(query);
   }
 
+  /**
+   * @param clusterId the cluster id
+   * @return the upgrade entity, or {@code null} if not found
+   */
+  @RequiresSession
+  public UpgradeEntity findLastUpgradeOrDowngradeForCluster(long clusterId) {
+    TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery(
+        "UpgradeEntity.findLatestForCluster", UpgradeEntity.class);
+    query.setMaxResults(1);
+    query.setParameter("clusterId", clusterId);
+
+    query.setHint(QueryHints.REFRESH, HintValues.TRUE);
+
+    return daoUtils.selectSingle(query);
+  }
+
   @Transactional
   public UpgradeEntity merge(UpgradeEntity upgradeEntity) {
     return entityManagerProvider.get().merge(upgradeEntity);

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
index fd866a1..db27ea5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
@@ -52,8 +52,10 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
       query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId"),
   @NamedQuery(name = "UpgradeEntity.findUpgrade",
       query = "SELECT u FROM UpgradeEntity u WHERE u.upgradeId = :upgradeId"),
+  @NamedQuery(name = "UpgradeEntity.findLatestForClusterInDirection",
+      query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC"),
   @NamedQuery(name = "UpgradeEntity.findLatestForCluster",
-      query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY u.upgradeId DESC"),
+      query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"),
 })
 public class UpgradeEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 128c392..38d05ab 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -132,11 +132,18 @@ public interface Cluster {
 
   /**
    * Get the ClusterVersionEntity object whose state is CURRENT.
-   * @return
+   * @return Cluster Version entity to whose state is CURRENT.
    */
   ClusterVersionEntity getCurrentClusterVersion();
 
   /**
+   * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT.
+   * If RU/EU is in progress, based on the direction and desired stack, determine which version to use.
+   * @return Cluster Version entity to use.
+   */
+  ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException;
+
+  /**
    * Get all of the ClusterVersionEntity objects for the cluster.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 878f83b..9e456eb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -48,6 +48,8 @@ import org.apache.ambari.server.ParentObjectNotFoundException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariSessionManager;
@@ -56,6 +58,7 @@ import org.apache.ambari.server.controller.ConfigurationResponse;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
 import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
+import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
 import org.apache.ambari.server.events.AmbariEvent.AmbariEventType;
 import org.apache.ambari.server.events.ClusterEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
@@ -68,8 +71,10 @@ import org.apache.ambari.server.orm.dao.ClusterStateDAO;
 import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
 import org.apache.ambari.server.orm.dao.HostConfigMappingDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.HostVersionDAO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
 import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
 import org.apache.ambari.server.orm.dao.StackDAO;
 import org.apache.ambari.server.orm.dao.TopologyRequestDAO;
@@ -83,16 +88,20 @@ import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
 import org.apache.ambari.server.orm.entities.ConfigGroupEntity;
 import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.orm.entities.PermissionEntity;
 import org.apache.ambari.server.orm.entities.PrivilegeEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
 import org.apache.ambari.server.orm.entities.ResourceEntity;
 import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
+import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
+import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.security.authorization.AuthorizationException;
 import org.apache.ambari.server.security.authorization.AuthorizationHelper;
 import org.apache.ambari.server.state.Cluster;
@@ -125,6 +134,7 @@ import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary;
 import org.apache.ambari.server.topology.TopologyRequest;
 import org.apache.commons.lang.StringUtils;
@@ -212,6 +222,12 @@ public class ClusterImpl implements Cluster {
   private ClusterVersionDAO clusterVersionDAO;
 
   @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private RequestDAO requestDAO;
+
+  @Inject
   private HostDAO hostDAO;
 
   @Inject
@@ -302,6 +318,9 @@ public class ClusterImpl implements Cluster {
       StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
       loadServiceConfigTypes();
     }
+
+    // Load any active stack upgrades.
+    loadStackUpgrade();
   }
 
 
@@ -321,6 +340,24 @@ public class ClusterImpl implements Cluster {
   }
 
   /**
+   * When a cluster is first loaded, determine if it has a stack upgrade in progress.
+   */
+  private void loadStackUpgrade() {
+    clusterGlobalLock.writeLock().lock();
+
+    try {
+      UpgradeEntity activeUpgrade = this.getUpgradeInProgress();
+      if (activeUpgrade != null) {
+        this.setUpgradeEntity(activeUpgrade);
+      }
+    } catch (AmbariException e) {
+      LOG.error("Unable to load active stack upgrade. Error: " + e.getMessage());
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+  }
+
+  /**
    * Construct config type to service name mapping
    * @throws AmbariException when stack or its part not found
    */
@@ -1140,12 +1177,106 @@ public class ClusterImpl implements Cluster {
     Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities();
     for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
       if (clusterVersionEntity.getState() == RepositoryVersionState.CURRENT) {
-//      TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation
+        // TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation
         return clusterVersionEntity;
       }
     }
     return null;
-//    return clusterVersionDAO.findByClusterAndStateCurrent(getClusterName());
+  }
+
+  /**
+   * Get any stack upgrade currently in progress.
+   * @return
+   */
+  private UpgradeEntity getUpgradeInProgress() {
+    UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(this.getClusterId());
+    if (mostRecentUpgrade != null) {
+      List<HostRoleStatus> UNFINISHED_STATUSES = new ArrayList();
+      UNFINISHED_STATUSES.add(HostRoleStatus.PENDING);
+      UNFINISHED_STATUSES.add(HostRoleStatus.ABORTED);
+
+      List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(mostRecentUpgrade.getRequestId(), UNFINISHED_STATUSES);
+      if (!commands.isEmpty()) {
+        return mostRecentUpgrade;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT.
+   * If RU/EU is in progress, based on the direction and desired stack, determine which version to use.
+   * Assuming upgrading from HDP 2.2.0.0-1 to 2.3.0.0-2, then
+   * RU Upgrade: 2.3.0.0-2 (desired stack id)
+   * RU Downgrade: 2.2.0.0-1 (desired stack id)
+   * EU Upgrade: while stopping services and before changing desired stack, use 2.2.0.0-1, after, use 2.3.0.0-2
+   * EU Downgrade: while stopping services and before changing desired stack, use 2.3.0.0-2, after, use 2.2.0.0-1
+   * @return
+   */
+  @Override
+  public ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException {
+    // This is not reliable. Need to find the last upgrade request.
+    UpgradeEntity upgradeInProgress = this.getUpgradeEntity();
+    if (upgradeInProgress == null) {
+      return this.getCurrentClusterVersion();
+    }
+
+    String effectiveVersion = null;
+    switch (upgradeInProgress.getUpgradeType()) {
+      case NON_ROLLING:
+        if (upgradeInProgress.getDirection() == Direction.UPGRADE) {
+          boolean pastChangingStack = this.isNonRollingUpgradePastUpgradingStack(upgradeInProgress);
+          effectiveVersion = pastChangingStack ? upgradeInProgress.getToVersion() : upgradeInProgress.getFromVersion();
+        } else {
+          // Should be the lower value during a Downgrade.
+          effectiveVersion = upgradeInProgress.getToVersion();
+        }
+        break;
+      case ROLLING:
+      default:
+        // Version will be higher on upgrade and lower on downgrade directions.
+        effectiveVersion = upgradeInProgress.getToVersion();
+        break;
+    }
+
+    if (effectiveVersion == null) {
+      throw new AmbariException("Unable to determine which version to use during Stack Upgrade, effectiveVersion is null.");
+    }
+
+    // Find the first cluster version whose repo matches the expected version.
+    Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities();
+    for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
+      if (clusterVersionEntity.getRepositoryVersion().getVersion().equals(effectiveVersion)) {
+        return clusterVersionEntity;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Given a NonRolling stack upgrade, determine if it has already crossed the point of using the newer version.
+   * @param upgrade Stack Upgrade
+   * @return Return true if should be using to_version, otherwise, false to mean the from_version.
+   */
+  private boolean isNonRollingUpgradePastUpgradingStack(UpgradeEntity upgrade) {
+    for (UpgradeGroupEntity group : upgrade.getUpgradeGroups()) {
+      if (group.getName().equalsIgnoreCase(UpgradeResourceProvider.CONST_UPGRADE_GROUP_NAME)) {
+        for (UpgradeItemEntity item : group.getItems()) {
+          List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(upgrade.getRequestId(), item.getStageId());
+          List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByPKs(taskIds);
+          for (HostRoleCommandEntity command : commands) {
+            if (command.getCustomCommandName() != null &&
+                command.getCustomCommandName().equalsIgnoreCase(UpgradeResourceProvider.CONST_CUSTOM_COMMAND_NAME) &&
+                command.getStatus() == HostRoleStatus.COMPLETED) {
+              return true;
+            }
+          }
+        }
+        return false;
+      }
+    }
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
index 64a8852..4a474bf 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -57,11 +58,13 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
 import org.apache.ambari.server.orm.dao.StackDAO;
 import org.apache.ambari.server.orm.dao.StageDAO;
 import org.apache.ambari.server.orm.dao.UpgradeDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
@@ -113,6 +116,7 @@ import com.google.inject.util.Modules;
 public class UpgradeResourceProviderTest {
 
   private UpgradeDAO upgradeDao = null;
+  private RequestDAO requestDao = null;
   private RepositoryVersionDAO repoVersionDao = null;
   private Injector injector;
   private Clusters clusters;
@@ -159,6 +163,7 @@ public class UpgradeResourceProviderTest {
 
     stackDAO = injector.getInstance(StackDAO.class);
     upgradeDao = injector.getInstance(UpgradeDAO.class);
+    requestDao = injector.getInstance(RequestDAO.class);
     repoVersionDao = injector.getInstance(RepositoryVersionDAO.class);
 
     AmbariEventPublisher publisher = createNiceMock(AmbariEventPublisher.class);
@@ -575,6 +580,13 @@ public class UpgradeResourceProviderTest {
 
     // a downgrade MUST have an upgrade to come from, so populate an upgrade in
     // the DB
+    RequestEntity requestEntity = new RequestEntity();
+    requestEntity.setRequestId(2L);
+    requestEntity.setClusterId(cluster.getClusterId());
+    requestEntity.setStatus(HostRoleStatus.PENDING);
+    requestEntity.setStages(new ArrayList<StageEntity>());
+    requestDao.create(requestEntity);
+
     UpgradeEntity upgradeEntity = new UpgradeEntity();
     upgradeEntity.setClusterId(cluster.getClusterId());
     upgradeEntity.setDirection(Direction.UPGRADE);
@@ -582,7 +594,7 @@ public class UpgradeResourceProviderTest {
     upgradeEntity.setToVersion("2.2.2.2");
     upgradeEntity.setUpgradePackage("upgrade_test");
     upgradeEntity.setUpgradeType(UpgradeType.ROLLING);
-    upgradeEntity.setRequestId(1L);
+    upgradeEntity.setRequestId(2L);
 
     upgradeDao.create(upgradeEntity);
     upgrades = upgradeDao.findUpgrades(cluster.getClusterId());

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java
index cf79b6f..cc49cbd 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java
@@ -26,10 +26,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
@@ -55,6 +58,7 @@ public class UpgradeDAOTest {
   private Injector injector;
   private Long clusterId;
   private UpgradeDAO dao;
+  private RequestDAO requestDAO;
 
   private OrmTestHelper helper;
 
@@ -67,6 +71,7 @@ public class UpgradeDAOTest {
     injector.getInstance(GuiceJpaInitializer.class);
 
     dao = injector.getInstance(UpgradeDAO.class);
+    requestDAO = injector.getInstance(RequestDAO.class);
     helper = injector.getInstance(OrmTestHelper.class);
     clusterId = helper.createCluster();
 
@@ -140,11 +145,18 @@ public class UpgradeDAOTest {
   @Test
   public void testFindLastUpgradeForCluster() throws Exception {
     // create upgrade entities
+    RequestEntity requestEntity = new RequestEntity();
+    requestEntity.setRequestId(1L);
+    requestEntity.setClusterId(1L);
+    requestEntity.setStatus(HostRoleStatus.PENDING);
+    requestEntity.setStages(new ArrayList<StageEntity>());
+    requestDAO.create(requestEntity);
+
     UpgradeEntity entity1 = new UpgradeEntity();
     entity1.setId(11L);
-    entity1.setClusterId(Long.valueOf(1));
+    entity1.setClusterId(1L);
     entity1.setDirection(Direction.UPGRADE);
-    entity1.setRequestId(Long.valueOf(1));
+    entity1.setRequestId(1L);
     entity1.setFromVersion("2.2.0.0-1234");
     entity1.setToVersion("2.3.0.0-4567");
     entity1.setUpgradeType(UpgradeType.ROLLING);
@@ -153,9 +165,9 @@ public class UpgradeDAOTest {
     dao.create(entity1);
     UpgradeEntity entity2 = new UpgradeEntity();
     entity2.setId(22L);
-    entity2.setClusterId(Long.valueOf(1));
+    entity2.setClusterId(1L);
     entity2.setDirection(Direction.DOWNGRADE);
-    entity2.setRequestId(Long.valueOf(1));
+    entity2.setRequestId(1L);
     entity2.setFromVersion("2.3.0.0-4567");
     entity2.setToVersion("2.2.0.0-1234");
     entity2.setUpgradeType(UpgradeType.ROLLING);
@@ -164,9 +176,9 @@ public class UpgradeDAOTest {
     dao.create(entity2);
     UpgradeEntity entity3 = new UpgradeEntity();
     entity3.setId(33L);
-    entity3.setClusterId(Long.valueOf(1));
+    entity3.setClusterId(1L);
     entity3.setDirection(Direction.UPGRADE);
-    entity3.setRequestId(Long.valueOf(1));
+    entity3.setRequestId(1L);
     entity3.setFromVersion("2.2.0.0-1234");
     entity3.setToVersion("2.3.1.1-4567");
     entity3.setUpgradeType(UpgradeType.ROLLING);
@@ -185,11 +197,18 @@ public class UpgradeDAOTest {
    */
   @Test
   public void testUpdatableColumns() throws Exception {
+    RequestEntity requestEntity = new RequestEntity();
+    requestEntity.setRequestId(1L);
+    requestEntity.setClusterId(1L);
+    requestEntity.setStatus(HostRoleStatus.PENDING);
+    requestEntity.setStages(new ArrayList<StageEntity>());
+    requestDAO.create(requestEntity);
+
     UpgradeEntity upgradeEntity = new UpgradeEntity();
     upgradeEntity.setId(11L);
-    upgradeEntity.setClusterId(Long.valueOf(1));
+    upgradeEntity.setClusterId(1L);
     upgradeEntity.setDirection(Direction.UPGRADE);
-    upgradeEntity.setRequestId(Long.valueOf(1));
+    upgradeEntity.setRequestId(1L);
     upgradeEntity.setFromVersion("2.2.0.0-1234");
     upgradeEntity.setToVersion("2.3.0.0-4567");
     upgradeEntity.setUpgradeType(UpgradeType.ROLLING);

http://git-wip-us.apache.org/repos/asf/ambari/blob/6fe7f832/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
index 597270e..b2c4b93 100644
--- a/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
+++ b/ambari-server/src/test/resources/stacks/HDP/2.2.0/upgrades/upgrade_test_skip_failures.xml
@@ -19,6 +19,7 @@
   <target>2.2.*</target>
   <skip-failures>true</skip-failures>
   <skip-service-check-failures>true</skip-service-check-failures>
+  <type>ROLLING</type>
   <prerequisite-checks>
     <check>org.apache.ambari.server.checks.HiveMultipleMetastoreCheck</check>
     <check>org.apache.ambari.server.checks.MapReduce2JobHistoryStatePreservingCheck</check>