You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/09/10 23:41:56 UTC

ambari git commit: AMBARI-13001. Stop-and-Start Upgrade: Bootstrap HDP 2.1 repo, cluster_version, and host_versions as CURRENT after upgrading Ambari (alejandro)

Repository: ambari
Updated Branches:
  refs/heads/branch-dev-stop-all-upgrade a67ddd27d -> a3fb2d24b


AMBARI-13001. Stop-and-Start Upgrade: Bootstrap HDP 2.1 repo, cluster_version, and host_versions as CURRENT after upgrading Ambari (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a3fb2d24
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a3fb2d24
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a3fb2d24

Branch: refs/heads/branch-dev-stop-all-upgrade
Commit: a3fb2d24b3f167d28d5196518d404657792cad60
Parents: a67ddd2
Author: Alejandro Fernandez <af...@hortonworks.com>
Authored: Thu Sep 3 11:35:40 2015 -0700
Committer: Alejandro Fernandez <af...@hortonworks.com>
Committed: Thu Sep 10 14:06:02 2015 -0700

----------------------------------------------------------------------
 .../internal/UpgradeResourceProvider.java       |   7 +-
 .../server/orm/dao/ClusterVersionDAO.java       |  23 ++
 .../apache/ambari/server/orm/dao/CrudDAO.java   |  15 +
 .../ambari/server/orm/dao/HostVersionDAO.java   |  42 +--
 .../server/upgrade/UpgradeCatalog212.java       | 146 +++++++-
 .../HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml | 369 +++++++++++++++++++
 .../server/upgrade/UpgradeCatalog212Test.java   | 136 ++++++-
 7 files changed, 688 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 2c9714e..c73f9d4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -457,10 +457,9 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
      */
     String preferredUpgradePackName = (String) requestMap.get(UPGRADE_PACK);
 
-    // The type will determine which Upgrade Pack to use.
-    // TODO AMBARI-12698, uncomment once the UI starts passing the upgrade type.
-    //final UpgradeType upgradeType = (UpgradeType) requestMap.get(UPGRADE_TYPE);
-    final UpgradeType upgradeType = UpgradeType.ROLLING;
+    // Default to ROLLING upgrade, but attempt to read from properties.
+    final UpgradeType upgradeType = requestMap.containsKey(UPGRADE_TYPE) ?
+        UpgradeType.valueOf((String) requestMap.get(UPGRADE_TYPE)) : UpgradeType.ROLLING;
 
     if (null == clusterName) {
       throw new AmbariException(String.format("%s is required", UPGRADE_CLUSTER_NAME));

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterVersionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterVersionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterVersionDAO.java
index d3326b1..8d4c5ee 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterVersionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterVersionDAO.java
@@ -23,8 +23,11 @@ import javax.persistence.NoResultException;
 import javax.persistence.NonUniqueResultException;
 import javax.persistence.TypedQuery;
 
+import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.StackId;
 
@@ -153,4 +156,24 @@ public class ClusterVersionDAO extends CrudDAO<ClusterVersionEntity, Long>{
 
     return daoUtils.selectList(query);
   }
+
+  /**
+   * Construct a Cluster Version and return it. This is primarily used to be able to construct the object and mock
+   * the function call.
+   * @param cluster Cluster
+   * @param repositoryVersion Repository Version
+   * @param state Initial State
+   * @param startTime Start Time
+   * @param endTime End Time
+   * @param userName Username, such as "admin"
+   * @return Return new ClusterVersion object.
+   */
+  @Transactional
+  public ClusterVersionEntity create(ClusterEntity cluster, RepositoryVersionEntity repositoryVersion,
+                                     RepositoryVersionState state, long startTime, long endTime, String userName) {
+    ClusterVersionEntity clusterVersionEntity = new ClusterVersionEntity(cluster,
+        repositoryVersion, state, startTime, endTime, userName);
+    this.create(clusterVersionEntity);
+    return clusterVersionEntity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/CrudDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/CrudDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/CrudDAO.java
index 4382f59..ed0a931 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/CrudDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/CrudDAO.java
@@ -73,6 +73,21 @@ public class CrudDAO<E, K> {
   }
 
   /**
+   * Retrieves the maximum ID from the entities.
+   *
+   * @param idColName name of the column that corresponds to the ID.
+   * @return maximum ID, or 0 if none exist.
+   */
+  @RequiresSession
+  public Long findMaxId(String idColName) {
+    final TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT MAX(entity." + idColName + ") FROM "
+        + entityClass.getSimpleName() + " entity", Long.class);
+    // May be null if no results.
+    Long result = daoUtils.selectOne(query);
+    return result == null ? 0 : result;
+  }
+
+  /**
    * Creates entity.
    *
    * @param entity entity to create

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
index a2ff211..ad617af 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
@@ -43,21 +43,17 @@ import com.google.inject.persist.Transactional;
  * {@link org.apache.ambari.server.state.RepositoryVersionState#UPGRADING}.
  */
 @Singleton
-public class HostVersionDAO {
+public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
   @Inject
   Provider<EntityManager> entityManagerProvider;
   @Inject
   DaoUtils daoUtils;
 
   /**
-   * Get the object with the given id.
-   *
-   * @param id Primary key id
-   * @return Return the object with the given primary key
+   * Constructor.
    */
-  @RequiresSession
-  public HostVersionEntity findByPK(long id) {
-    return entityManagerProvider.get().find(HostVersionEntity.class, id);
+  public HostVersionDAO() {
+    super(HostVersionEntity.class);
   }
 
   /**
@@ -189,31 +185,6 @@ public class HostVersionDAO {
     return daoUtils.selectSingle(query);
   }
 
-  @RequiresSession
-  public List<HostVersionEntity> findAll() {
-    return daoUtils.selectAll(entityManagerProvider.get(), HostVersionEntity.class);
-  }
-
-  @Transactional
-  public void refresh(HostVersionEntity hostVersionEntity) {
-    entityManagerProvider.get().refresh(hostVersionEntity);
-  }
-
-  @Transactional
-  public void create(HostVersionEntity hostVersionEntity) {
-    entityManagerProvider.get().persist(hostVersionEntity);
-  }
-
-  @Transactional
-  public HostVersionEntity merge(HostVersionEntity hostVersionEntity) {
-    return entityManagerProvider.get().merge(hostVersionEntity);
-  }
-
-  @Transactional
-  public void remove(HostVersionEntity hostVersionEntity) {
-    entityManagerProvider.get().remove(merge(hostVersionEntity));
-  }
-
   @Transactional
   public void removeByHostName(String hostName) {
     Collection<HostVersionEntity> hostVersions = this.findByHost(hostName);
@@ -221,9 +192,4 @@ public class HostVersionDAO {
       this.remove(hostVersion);
     }
   }
-
-  @Transactional
-  public void removeByPK(long id) {
-    remove(findByPK(id));
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
index 6919e64..ce78ebc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog212.java
@@ -22,14 +22,28 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
 import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -39,11 +53,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.text.MessageFormat;
+import java.util.Collection;
 
 
 /**
  * Upgrade catalog for version 2.1.2.
  */
+// TODO AMBARI-12698, by the time this gets committed, it will be 2.1.3 instead of 2.1.2
 public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
 
   public static final String UPGRADE_PACKAGE_COL = "upgrade_package";
@@ -115,7 +131,10 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
    */
   @Override
   protected void executeDMLUpdates() throws AmbariException, SQLException {
+    // This one actually performs both DDL and DML, so it needs to be first.
     executeStackUpgradeDDLUpdates();
+
+    bootstrapRepoVersionForHDP21();
   }
 
   // ----- UpgradeCatalog212 --------------------------------------------
@@ -207,7 +226,7 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
               String upgradeType = rs.getString("upgrade_type");
 
               LOG.info(MessageFormat.format("Populating rows for the upgrade table record with " +
-                  "upgrade_id: {0}, cluster_id: {1}, from_version: {2}, to_version: {3}, direction: {4}",
+                  "upgrade_id: {0,number,#}, cluster_id: {1,number,#}, from_version: {2}, to_version: {3}, direction: {4}",
                   upgradeId, clusterId, fromVersion, toVersion, direction));
 
               // Set all upgrades that have been done so far to type "rolling"
@@ -285,7 +304,6 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
    * @param version Stack version
    * @return The value of the upgrade_package column, or null if not found.
    */
-
   private String calculateUpgradePackage(StackEntity stack, String version) {
     String upgradePackage = null;
     // Find the corresponding repo_version, and extract its upgrade_package
@@ -320,4 +338,128 @@ public class UpgradeCatalog212 extends AbstractUpgradeCatalog {
     }
     return upgradePackage;
   }
+
+  /**
+   * If still on HDP 2.1, then no repo versions exist, so need to bootstrap the HDP 2.1 repo version,
+   * and mark it as CURRENT in the cluster_version table for the cluster, as well as the host_version table
+   * for all hosts.
+   */
+  @Transactional
+  public void bootstrapRepoVersionForHDP21() throws AmbariException, SQLException {
+    final String hardcodedInitialVersion = "2.1.0.0-0001";
+    AmbariManagementController amc = injector.getInstance(AmbariManagementController.class);
+    AmbariMetaInfo ambariMetaInfo = amc.getAmbariMetaInfo();
+    StackDAO stackDAO = injector.getInstance(StackDAO.class);
+    RepositoryVersionHelper repositoryVersionHelper = injector.getInstance(RepositoryVersionHelper.class);
+    RepositoryVersionDAO repositoryVersionDAO = injector.getInstance(RepositoryVersionDAO.class);
+    ClusterVersionDAO clusterVersionDAO = injector.getInstance(ClusterVersionDAO.class);
+    HostVersionDAO hostVersionDAO = injector.getInstance(HostVersionDAO.class);
+
+    Clusters clusters = amc.getClusters();
+    if (clusters == null) {
+      LOG.error("Unable to get Clusters entity.");
+      return;
+    }
+
+    for (Cluster cluster : clusters.getClusters().values()) {
+      ClusterEntity clusterEntity = clusterDAO.findByName(cluster.getClusterName());
+      final StackId stackId = cluster.getCurrentStackVersion();
+      LOG.info(MessageFormat.format("Analyzing cluster {0}, currently at stack {1} and version {2}",
+          cluster.getClusterName(), stackId.getStackName(), stackId.getStackVersion()));
+
+      if (stackId.getStackName().equalsIgnoreCase("HDP") && stackId.getStackVersion().equalsIgnoreCase("2.1")) {
+        final StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
+        StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
+
+        LOG.info("Bootstrapping the versions since using HDP-2.1");
+
+        // The actual value is not known, so use this.
+        String displayName = stackId.getStackName() + "-" + hardcodedInitialVersion;
+
+        // However, the Repo URLs should be correct.
+        String operatingSystems = repositoryVersionHelper.serializeOperatingSystems(stackInfo.getRepositories());
+
+        // Create the Repo Version if it doesn't already exist.
+        RepositoryVersionEntity repoVersionEntity = repositoryVersionDAO.findByDisplayName(displayName);
+        if (null != repoVersionEntity) {
+          LOG.info(MessageFormat.format("A Repo Version already exists with Display Name: {0}", displayName));
+        } else {
+          final long repoVersionIdSeq = repositoryVersionDAO.findMaxId("id");
+          // Safe to attempt to add the sequence if it doesn't exist already.
+          addSequence("repo_version_id_seq", repoVersionIdSeq, false);
+
+          repoVersionEntity = repositoryVersionDAO.create(
+              stackEntity, hardcodedInitialVersion, displayName, operatingSystems);
+          LOG.info(MessageFormat.format("Created Repo Version with ID: {0,number,#}\n, Display Name: {1}, Repo URLs: {2}\n",
+              repoVersionEntity.getId(), displayName, operatingSystems));
+        }
+
+        // Create the Cluster Version if it doesn't already exist.
+        ClusterVersionEntity clusterVersionEntity = clusterVersionDAO.findByClusterAndStackAndVersion(cluster.getClusterName(),
+            stackId, hardcodedInitialVersion);
+
+        if (null != clusterVersionEntity) {
+          LOG.info(MessageFormat.format("A Cluster Version version for cluster: {0}, version: {1}, already exists; its state is {2}.",
+              cluster.getClusterName(), clusterVersionEntity.getRepositoryVersion().getVersion(), clusterVersionEntity.getState()));
+
+          // If there are not CURRENT cluster versions, make this one the CURRENT one.
+          if (clusterVersionEntity.getState() != RepositoryVersionState.CURRENT &&
+              clusterVersionDAO.findByClusterAndState(cluster.getClusterName(), RepositoryVersionState.CURRENT).isEmpty()) {
+            clusterVersionEntity.setState(RepositoryVersionState.CURRENT);
+            clusterVersionDAO.merge(clusterVersionEntity);
+          }
+        } else {
+          final long clusterVersionIdSeq = clusterVersionDAO.findMaxId("id");
+          // Safe to attempt to add the sequence if it doesn't exist already.
+          addSequence("cluster_version_id_seq", clusterVersionIdSeq, false);
+
+          clusterVersionEntity = clusterVersionDAO.create(clusterEntity, repoVersionEntity, RepositoryVersionState.CURRENT,
+              System.currentTimeMillis(), System.currentTimeMillis(), "admin");
+          LOG.info(MessageFormat.format("Created Cluster Version with ID: {0,number,#}, cluster: {1}, version: {2}, state: {3}.",
+              clusterVersionEntity.getId(), cluster.getClusterName(), clusterVersionEntity.getRepositoryVersion().getVersion(),
+              clusterVersionEntity.getState()));
+        }
+
+        // Create the Host Versions if they don't already exist.
+        Collection<HostEntity> hosts = clusterEntity.getHostEntities();
+        boolean addedAtLeastOneHost = false;
+        if (null != hosts && !hosts.isEmpty()) {
+          for (HostEntity hostEntity : hosts) {
+            HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(cluster.getClusterName(),
+                stackId, hardcodedInitialVersion, hostEntity.getHostName());
+
+            if (null != hostVersionEntity) {
+              LOG.info(MessageFormat.format("A Host Version version for cluster: {0}, version: {1}, host: {2}, already exists; its state is {3}.",
+                  cluster.getClusterName(), hostVersionEntity.getRepositoryVersion().getVersion(),
+                  hostEntity.getHostName(), hostVersionEntity.getState()));
+
+              if (hostVersionEntity.getState() != RepositoryVersionState.CURRENT &&
+                  hostVersionDAO.findByClusterHostAndState(cluster.getClusterName(), hostEntity.getHostName(),
+                      RepositoryVersionState.CURRENT).isEmpty()) {
+                hostVersionEntity.setState(RepositoryVersionState.CURRENT);
+                hostVersionDAO.merge(hostVersionEntity);
+              }
+            } else {
+              // This should only be done the first time.
+              if (!addedAtLeastOneHost) {
+                final long hostVersionIdSeq = hostVersionDAO.findMaxId("id");
+                // Safe to attempt to add the sequence if it doesn't exist already.
+                addSequence("host_version_id_seq", hostVersionIdSeq, false);
+                addedAtLeastOneHost = true;
+              }
+
+              hostVersionEntity = new HostVersionEntity(hostEntity, repoVersionEntity, RepositoryVersionState.CURRENT);
+              hostVersionDAO.create(hostVersionEntity);
+              LOG.info(MessageFormat.format("Created Host Version with ID: {0,number,#}, cluster: {1}, version: {2}, host: {3}, state: {4}.",
+                  hostVersionEntity.getId(), cluster.getClusterName(), hostVersionEntity.getRepositoryVersion().getVersion(),
+                  hostEntity.getHostName(), hostVersionEntity.getState()));
+            }
+          }
+        } else {
+          LOG.info(MessageFormat.format("Not inserting any Host Version records since cluster {0} does not have any hosts.",
+              cluster.getClusterName()));
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
new file mode 100644
index 0000000..c55e1a8
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/upgrades/nonrolling-upgrade-2.3.xml
@@ -0,0 +1,369 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+<upgrade xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <target>2.3.*.*</target>
+  <target-stack>HDP-2.3</target-stack>
+  <type>NON_ROLLING</type>
+
+  <order>
+    <group xsi:type="cluster" name="PRE_CLUSTER" title="Prepare Upgrade">
+      <skippable>true</skippable>
+      <direction>UPGRADE</direction>
+      <execute-stage service="YARN" component="RESOURCEMANAGER" title="Stop YARN Queues">
+        <task xsi:type="manual">
+          <message>Before continuing, please stop all YARN queues. If yarn-site's yarn.resourcemanager.work-preserving-recovery.enabled is set to true, then you can skip this step since the clients will retry on their own.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="STORM" component="NIMBUS" title="Stop Storm Topologies">
+        <task xsi:type="manual">
+          <message>Before continuing, please deactivate and kill any currently running topologies.</message>
+        </task>
+      </execute-stage>
+    </group>
+
+    <group xsi:type="stop" name="Stop High-Level Daemons" title="Stop Daemons for High-Level Services">
+      <skippable>true</skippable>
+      <service-check>false</service-check>
+
+      <service name="FLUME">
+        <component>FLUME_HANDLER</component>
+      </service>
+
+      <service name="STORM">
+        <component>DRPC_SERVER</component>
+        <component>STORM_UI_SERVER</component>
+        <component>SUPERVISOR</component>
+        <component>STORM_REST_API</component>
+        <component>NIMBUS</component>
+      </service>
+
+      <service name="FALCON">
+        <component>FALCON_SERVER</component>
+      </service>
+
+      <service name="OOZIE">
+        <component>OOZIE_SERVER</component>
+      </service>
+
+      <service name="HIVE">
+        <component>WEBHCAT_SERVER</component>
+        <component>HIVE_SERVER</component>
+        <component>HIVE_METASTORE</component>
+      </service>
+
+      <service name="YARN">
+        <component>NODEMANAGER</component>
+        <component>RESOURCEMANAGER</component>
+        <component>APP_TIMELINE_SERVER</component>
+      </service>
+
+      <service name="MAPREDUCE2">
+        <component>HISTORYSERVER</component>
+      </service>
+    </group>
+
+    <group xsi:type="cluster" name="Backups" title="Take Backups">
+      <direction>UPGRADE</direction>
+      <skippable>true</skippable>
+      <execute-stage service="OOZIE" component="OOZIE_SERVER" title="Backup Oozie Database">
+        <task xsi:type="manual">
+          <message>Before continuing, please backup the Oozie Server database on {{oozie-env/oozie_hostname}}.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HIVE" component="HIVE_METASTORE" title="Backup Hive Metastore">
+        <task xsi:type="manual">
+          <message>Before continuing, please backup the Hive Metastore database located on the following host(s): {{hosts.all}}.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HBASE" component="HBASE_MASTER" title="Snapshot HBASE">
+        <task xsi:type="execute" hosts="master">
+          <script>scripts/hbase_upgrade.py</script>
+          <function>take_snapshot</function>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HDFS" component="NAMENODE" title="Snapshot HDFS">
+        <task xsi:type="execute" hosts="master">
+          <script>scripts/namenode.py</script>
+          <function>prepare_non_rolling_upgrade</function>
+        </task>
+      </execute-stage>
+    </group>
+
+    <group xsi:type="stop" name="Stop Low-Level Daemons" title="Stop Daemons for Low-Level Services">
+      <skippable>true</skippable>
+      <service-check>false</service-check>
+
+      <service name="HBASE">
+        <component>HBASE_REGIONSERVER</component>
+        <component>HBASE_MASTER</component>
+      </service>
+
+      <service name="HDFS">
+        <component>DATANODE</component>
+        <component>NAMENODE</component>
+        <component>SECONDARY_NAMENODE</component>
+        <component>ZKFC</component>
+        <component>JOURNALNODE</component>
+      </service>
+
+      <service name="ZOOKEEPER">
+        <component>ZOOKEEPER_SERVER</component>
+      </service>
+    </group>
+
+    <group name="Marker for Downgrade" title="Marker for Downgrade">
+      <direction>UPGRADE</direction>
+      <!-- TODO, if the user attempts a downgrade before this step, they can simply abort. -->
+    </group>
+
+    <group xsi:type="cluster" name="Restore Backups" title="Restore Backups">
+      <direction>DOWNGRADE</direction>
+      <skippable>true</skippable>
+
+      <!-- If the user attempts a downgrade after this point, they will need to restore backups
+      before starting any of the services. -->
+
+      <execute-stage service="OOZIE" component="OOZIE_SERVER" title="Backup Oozie Database">
+        <task xsi:type="manual">
+          <message>Before continuing, please restore the Oozie Server database on {{hosts.all}}.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HIVE" component="HIVE_METASTORE" title="Backup Hive Metastore">
+        <task xsi:type="manual">
+          <message>Before continuing, please restore the Hive Metastore database located on the following host(s): {{hosts.all}}.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HBASE" component="HBASE_MASTER" title="Snapshot HBASE">
+        <task xsi:type="execute" hosts="master">
+          <script>scripts/hbase_upgrade.py</script>
+          <function>restore_snapshot</function>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HDFS" component="NAMENODE" title="Snapshot HDFS">
+        <task xsi:type="execute" hosts="master">
+          <script>scripts/namenode.py</script>
+          <function>restore_snapshot</function>
+        </task>
+      </execute-stage>
+    </group>
+
+    <group xsi:type="cluster" name="ALL_HOST_OPS" title="Set Version On All Hosts">
+      <skippable>true</skippable>
+      <execute-stage title="Update stack to {{version}}">
+        <task xsi:type="execute">
+          <script>scripts/ru_set_all.py</script>
+          <function>actionexecute</function>
+        </task>
+      </execute-stage>
+    </group>
+
+    <!-- Now, restart all of the services. -->
+
+    <group xsi:type="restart" name="ZOOKEEPER" title="Zookeeper">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="ZOOKEEPER">
+        <service-check>false</service-check>
+        <component>ZOOKEEPER_SERVER</component>
+        <component>ZOOKEEPER_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="HDFS" title="HDFS">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="HDFS">
+        <component>JOURNALNODE</component>
+        <component>ZKFC</component>
+        <component>NAMENODE</component>
+        <component>SECONDARY_NAMENODE</component>
+        <component>DATANODE</component>
+        <component>HDFS_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="MR and YARN" title="MR and YARN">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="MAPREDUCE2">
+        <component>HISTORYSERVER</component>
+        <component>MAPREDUCE2_CLIENT</component>
+      </service>
+      <service name="YARN">
+        <component>APP_TIMELINE_SERVER</component>
+        <component>RESOURCEMANAGER</component>
+        <component>NODEMANAGER</component>
+        <component>YARN_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="HBASE" title="HBASE">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="HBASE">
+        <component>HBASE_MASTER</component>
+        <component>HBASE_REGIONSERVER</component>
+        <component>HBASE_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="CLIENTS" title="Tez, Pig, Sqoop Clients">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="TEZ">
+        <component>TEZ_CLIENT</component>
+      </service>
+
+      <service name="PIG">
+        <component>PIG</component>
+      </service>
+
+      <service name="SQOOP">
+        <component>SQOOP</component>
+      </service>
+    </group>
+
+    <group name="SERVICE_CHECK" title="All Service Checks" xsi:type="service-check">
+      <skippable>true</skippable>
+      <direction>UPGRADE</direction>
+      <priority>
+        <service>HBASE</service>
+        <service>MAPREDUCE2</service>
+        <service>YARN</service>
+        <service>HDFS</service>
+      </priority>
+    </group>
+
+    <group xsi:type="restart" name="HIVE" title="Hive">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="HIVE">
+        <component>HIVE_METASTORE</component>
+        <component>HIVE_SERVER</component>
+        <component>WEBHCAT_SERVER</component>
+        <component>HIVE_CLIENT</component>
+        <component>HCAT</component>
+      </service>
+    </group>
+
+    <!-- Upgrade Oozie DB only on Upgrade direction, and always create a new ShareLib. -->
+    <group name="Upgrade Oozie" title="Upgrade Oozie Database">
+      <direction>UPGRADE</direction>
+      <skippable>true</skippable>
+      <execute-stage service="OOZIE" component="OOZIE_SERVER" title="Upgrade Oozie Database">
+        <task xsi:type="execute" hosts="any" summary="Upgrading the database and creating a new sharelib">
+          <script>scripts/oozie_server_upgrade.py</script>
+          <function>upgrade_oozie_database_and_sharelib</function>
+        </task>
+      </execute-stage>
+    </group>
+
+    <!-- Only create the ShareLib folder during a Downgrade. -->
+    <group name="Downgrade Oozie" title="Downgrade Oozie ShareLib">
+      <direction>DOWNGRADE</direction>
+      <skippable>true</skippable>
+      <execute-stage service="OOZIE" component="OOZIE_SERVER" title="Downgrade Oozie ShareLib">
+        <task xsi:type="execute" hosts="any" summary="Upgrading the database and creating a new sharelib">
+          <script>scripts/oozie_server_upgrade.py</script>
+          <function>create_sharelib</function>
+        </task>
+      </execute-stage>
+    </group>
+
+    <group xsi:type="restart" name="OOZIE" title="Oozie">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="OOZIE">
+        <component>OOZIE_SERVER</component>
+        <component>OOZIE_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="FALCON" title="Falcon">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="FALCON">
+        <component>FALCON_SERVER</component>
+        <component>FALCON_CLIENT</component>
+      </service>
+    </group>
+
+    <group xsi:type="restart" name="STORM" title="Storm">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="STORM">
+        <component>NIMBUS</component>
+        <component>STORM_REST_API</component>
+        <component>SUPERVISOR</component>
+        <component>STORM_UI_SERVER</component>
+        <component>DRPC_SERVER</component>
+      </service>
+
+      <execute-stage service="STORM" component="DRPC_SERVER" title="Rebuild Storm Topology">
+        <task xsi:type="manual">
+          <message>Please rebuild your topology using the new Storm version dependencies and resubmit it using the newly created jar.</message>
+        </task>
+      </execute-stage>
+    </group>
+
+    <group xsi:type="restart" name="FLUME" title="Flume">
+      <service-check>false</service-check>
+      <skippable>true</skippable>
+      <service name="FLUME">
+        <component>FLUME_HANDLER</component>
+      </service>
+    </group>
+
+    <group xsi:type="cluster" name="POST_CLUSTER" title="Finalize {{direction.text.proper}}">
+      <skippable>true</skippable>
+      <execute-stage title="Check Unhealthy Hosts" id="unhealthy-hosts">
+        <task xsi:type="manual">
+          <message>The following hosts were unhealthy and should be resolved before finalizing can be completed: {{hosts.unhealthy}}</message>
+        </task>
+      </execute-stage>
+      
+      <execute-stage title="Confirm Finalize">
+        <direction>UPGRADE</direction>
+        <task xsi:type="manual">
+          <message>Please confirm you are ready to finalize.</message>
+        </task>
+      </execute-stage>
+
+      <execute-stage service="HDFS" component="NAMENODE" title="Execute HDFS Finalize">
+        <task xsi:type="execute" hosts="master">
+          <script>scripts/namenode.py</script>
+          <function>finalize_non_rolling_upgrade</function>
+        </task>
+      </execute-stage>
+
+      <execute-stage title="Save Cluster State" service="" component="">
+        <task xsi:type="server_action" class="org.apache.ambari.server.serveraction.upgrades.FinalizeUpgradeAction">
+        </task>
+      </execute-stage>
+    </group>
+  </order>
+</upgrade>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a3fb2d24/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog212Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog212Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog212Test.java
index 6268f91..1f27cb3 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog212Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog212Test.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.upgrade;
 
 
+import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createNiceMock;
@@ -29,16 +30,35 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
-import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.persist.PersistService;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.stack.StackManagerFactory;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.stack.OsFamily;
 
+import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
 import org.easymock.Capture;
 import org.junit.After;
 import org.junit.Assert;
@@ -56,16 +76,29 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 
 /**
  * {@link UpgradeCatalog212} unit tests.
  */
+// TODO AMBARI-12698, by the time this gets committed, it will be 2.1.3 instead of 2.1.2
 public class UpgradeCatalog212Test {
 
   private Injector injector;
   private Provider<EntityManager> entityManagerProvider = createStrictMock(Provider.class);
   private EntityManager entityManager = createNiceMock(EntityManager.class);
+  private AmbariManagementController amc = createNiceMock(AmbariManagementController.class);
+  private AmbariMetaInfo metaInfo = createNiceMock(AmbariMetaInfo.class);
+  private StackDAO stackDAO = createNiceMock(StackDAO.class);
+  private RepositoryVersionDAO repositoryVersionDAO = createNiceMock(RepositoryVersionDAO.class);
+  private ClusterVersionDAO clusterVersionDAO = createNiceMock(ClusterVersionDAO.class);
+  private HostVersionDAO hostVersionDAO = createNiceMock(HostVersionDAO.class);
+  private ClusterDAO clusterDAO = createNiceMock(ClusterDAO.class);
 
   @Before
   public void init() {
@@ -111,7 +144,10 @@ public class UpgradeCatalog212Test {
     // Execute any DDL schema changes
     upgradeSectionDDL.execute(dbAccessor);
 
-    // Replay sections
+    // Begin DML verifications
+    verifyBootstrapHDP21();
+
+    // Replay main sections
     replay(dbAccessor, configuration, resultSet, connection, statement);
 
     AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
@@ -127,11 +163,90 @@ public class UpgradeCatalog212Test {
     upgradeSectionDDL.verify(dbAccessor);
   }
 
+  /**
+   * Verify that when bootstrapping HDP 2.1, records get inserted into the
+   * repo_version, cluster_version, and host_version tables.
+   * @throws AmbariException
+   */
+  private void verifyBootstrapHDP21() throws Exception, AmbariException {
+    final String stackName = "HDP";
+    final String stackVersion = "2.1";
+    final String stackNameAndVersion = stackName + "-" + stackVersion;
+    final String buildNumber = "2.1.0.0-0001";
+    final String stackAndBuild = stackName + "-" + buildNumber;
+    final String clusterName = "c1";
+
+    expect(amc.getAmbariMetaInfo()).andReturn(metaInfo);
+
+    // Mock the actions to bootstrap if using HDP 2.1
+    Clusters clusters = createNiceMock(Clusters.class);
+    expect(amc.getClusters()).andReturn(clusters);
+
+    Map<String, Cluster> clusterHashMap = new HashMap<String, Cluster>();
+    Cluster cluster = createNiceMock(Cluster.class);
+    clusterHashMap.put(clusterName, cluster);
+    expect(clusters.getClusters()).andReturn(clusterHashMap);
+
+    StackId stackId = new StackId(stackNameAndVersion);
+    expect(cluster.getCurrentStackVersion()).andReturn(stackId);
+
+    StackInfo stackInfo = new StackInfo();
+    stackInfo.setVersion(buildNumber);
+    expect(metaInfo.getStack(stackName, stackVersion)).andReturn(stackInfo);
+
+    StackEntity stackEntity = createNiceMock(StackEntity.class);
+    expect(stackEntity.getStackName()).andReturn(stackName);
+    expect(stackEntity.getStackVersion()).andReturn(stackVersion);
+
+    expect(stackDAO.find(stackName, stackVersion)).andReturn(stackEntity);
+
+    replay(amc, metaInfo, clusters, cluster, stackEntity, stackDAO);
+
+    // Mock more function calls
+    // Repository Version
+    RepositoryVersionEntity repositoryVersionEntity = createNiceMock(RepositoryVersionEntity.class);
+    expect(repositoryVersionDAO.findByDisplayName(stackAndBuild)).andReturn(null);
+    expect(repositoryVersionDAO.findAll()).andReturn(Collections.<RepositoryVersionEntity>emptyList());
+    expect(repositoryVersionDAO.create(anyObject(StackEntity.class), anyObject(String.class), anyObject(String.class), anyObject(String.class))).andReturn(repositoryVersionEntity);
+    expect(repositoryVersionEntity.getId()).andReturn(1L);
+    expect(repositoryVersionEntity.getVersion()).andReturn(buildNumber);
+    replay(repositoryVersionDAO, repositoryVersionEntity);
+
+    // Cluster Version
+    ClusterVersionEntity clusterVersionEntity = createNiceMock(ClusterVersionEntity.class);
+    expect(clusterVersionEntity.getId()).andReturn(1L);
+    expect(clusterVersionEntity.getState()).andReturn(RepositoryVersionState.CURRENT);
+    expect(clusterVersionEntity.getRepositoryVersion()).andReturn(repositoryVersionEntity);
+
+    expect(clusterVersionDAO.findByClusterAndStackAndVersion(anyObject(String.class), anyObject(StackId.class), anyObject(String.class))).andReturn(null);
+    expect(clusterVersionDAO.findAll()).andReturn(Collections.<ClusterVersionEntity>emptyList());
+    expect(clusterVersionDAO.create(anyObject(ClusterEntity.class), anyObject(RepositoryVersionEntity.class), anyObject(RepositoryVersionState.class), anyLong(), anyLong(), anyObject(String.class))).andReturn(clusterVersionEntity);
+    replay(clusterVersionDAO, clusterVersionEntity);
+
+    // Host Version
+    ClusterEntity clusterEntity = createNiceMock(ClusterEntity.class);
+    expect(clusterEntity.getClusterName()).andReturn(clusterName).anyTimes();
+    expect(clusterDAO.findByName(anyObject(String.class))).andReturn(clusterEntity);
+
+    Collection<HostEntity> hostEntities = new ArrayList<HostEntity>();
+    HostEntity hostEntity1 = createNiceMock(HostEntity.class);
+    HostEntity hostEntity2 = createNiceMock(HostEntity.class);
+    expect(hostEntity1.getHostName()).andReturn("host1");
+    expect(hostEntity2.getHostName()).andReturn("host2");
+    hostEntities.add(hostEntity1);
+    hostEntities.add(hostEntity2);
+    expect(clusterEntity.getHostEntities()).andReturn(hostEntities);
+
+    expect(hostVersionDAO.findByClusterStackVersionAndHost(anyObject(String.class), anyObject(StackId.class), anyObject(String.class), anyObject(String.class))).andReturn(null);
+    expect(hostVersionDAO.findAll()).andReturn(Collections.<HostVersionEntity>emptyList());
+
+    replay(clusterEntity, clusterDAO, hostVersionDAO, hostEntity1, hostEntity2);
+  }
+
   @Test
   public void testGetTargetVersion() throws Exception {
     final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
-    UpgradeCatalog   upgradeCatalog = getUpgradeCatalog(dbAccessor);
-
+    UpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
     Assert.assertEquals("2.1.2", upgradeCatalog.getTargetVersion());
   }
 
@@ -149,7 +264,16 @@ public class UpgradeCatalog212Test {
         binder.bind(DBAccessor.class).toInstance(dbAccessor);
         binder.bind(EntityManager.class).toInstance(entityManager);
         binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
-        binder.bind(DaoUtils.class).toInstance(createNiceMock(DaoUtils.class));
+        binder.bind(ClusterDAO.class).toInstance(clusterDAO);
+        binder.bind(RepositoryVersionHelper.class).toInstance(createNiceMock(RepositoryVersionHelper.class));
+        binder.bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+        binder.bind(AmbariManagementController.class).toInstance(amc);
+        binder.bind(AmbariMetaInfo.class).toInstance(metaInfo);
+        binder.bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+        binder.bind(StackDAO.class).toInstance(stackDAO);
+        binder.bind(RepositoryVersionDAO.class).toInstance(repositoryVersionDAO);
+        binder.bind(ClusterVersionDAO.class).toInstance(clusterVersionDAO);
+        binder.bind(HostVersionDAO.class).toInstance(hostVersionDAO);
       }
     };
     Injector injector = Guice.createInjector(module);