You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/10/23 12:00:12 UTC

ambari git commit: AMBARI-13528 Ambari-server upgrade from 2.1.2 to 2.1.3 fails (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk e7f77b6b2 -> d30b5f036


AMBARI-13528 Ambari-server upgrade from 2.1.2 to 2.1.3 fails (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d30b5f03
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d30b5f03
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d30b5f03

Branch: refs/heads/trunk
Commit: d30b5f0369672bd62a246f37d07bfddc50df8c7a
Parents: e7f77b6
Author: Dmytro Sen <ds...@apache.org>
Authored: Fri Oct 23 12:59:58 2015 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Fri Oct 23 12:59:58 2015 +0300

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog213.java       | 406 ++++++++++++++++++-
 .../server/upgrade/UpgradeCatalog213Test.java   | 304 +++++++++++---
 2 files changed, 647 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d30b5f03/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 97cfb3a..965689a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -18,8 +18,12 @@
 
 package org.apache.ambari.server.upgrade;
 
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,20 +33,39 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
 import org.apache.ambari.server.orm.dao.UpgradeDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
 import org.apache.ambari.server.utils.VersionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +73,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.persist.Transactional;
 
 /**
  * Upgrade catalog for version 2.1.3.
@@ -68,16 +92,22 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   private static final String HADOOP_ENV_CONFIG = "hadoop-env";
   private static final String CONTENT_PROPERTY = "content";
   private static final String HADOOP_ENV_CONTENT_TO_APPEND = "\n{% if is_datanode_max_locked_memory_set %}\n" +
-                                    "# Fix temporary bug, when ulimit from conf files is not picked up, without full relogin. \n" +
-                                    "# Makes sense to fix only when runing DN as root \n" +
-                                    "if [ \"$command\" == \"datanode\" ] && [ \"$EUID\" -eq 0 ] && [ -n \"$HADOOP_SECURE_DN_USER\" ]; then\n" +
-                                    "  ulimit -l {{datanode_max_locked_memory}}\n" +
-                                    "fi\n" +
-                                    "{% endif %};\n";
+    "# Fix temporary bug, when ulimit from conf files is not picked up, without full relogin. \n" +
+    "# Makes sense to fix only when runing DN as root \n" +
+    "if [ \"$command\" == \"datanode\" ] && [ \"$EUID\" -eq 0 ] && [ -n \"$HADOOP_SECURE_DN_USER\" ]; then\n" +
+    "  ulimit -l {{datanode_max_locked_memory}}\n" +
+    "fi\n" +
+    "{% endif %};\n";
 
   private static final String DOWNGRADE_ALLOWED_COLUMN = "downgrade_allowed";
   private static final String UPGRADE_SKIP_FAILURE_COLUMN = "skip_failures";
   private static final String UPGRADE_SKIP_SC_FAILURE_COLUMN = "skip_sc_failures";
+  public static final String UPGRADE_PACKAGE_COL = "upgrade_package";
+  public static final String UPGRADE_TYPE_COL = "upgrade_type";
+  public static final String REPO_VERSION_TABLE = "repo_version";
+
+  private static final String HOST_ROLE_COMMAND_TABLE = "host_role_command";
+  private static final String HOST_ID_COL = "host_id";
 
   private static final String KERBEROS_DESCRIPTOR_TABLE = "kerberos_descriptor";
   private static final String KERBEROS_DESCRIPTOR_NAME_COLUMN = "kerberos_descriptor_name";
@@ -91,6 +121,11 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   @Inject
   DaoUtils daoUtils;
 
+  @Inject
+  private RepositoryVersionDAO repositoryVersionDAO;
+
+  @Inject
+  private ClusterDAO clusterDAO;
 
   // ----- Constructors ------------------------------------------------------
 
@@ -102,8 +137,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   @Inject
   public UpgradeCatalog213(Injector injector) {
     super(injector);
-
-    daoUtils = injector.getInstance(DaoUtils.class);
+    this.injector = injector;
   }
 
   // ----- UpgradeCatalog ----------------------------------------------------
@@ -132,6 +166,10 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   @Override
   protected void executeDDLUpdates() throws AmbariException, SQLException {
     executeUpgradeDDLUpdates();
+
+    // Alter the host_role_command table to allow host_id to be nullable
+    dbAccessor.alterColumn(HOST_ROLE_COMMAND_TABLE, new DBColumnInfo(HOST_ID_COL, Long.class, null, null, true));
+
     addKerberosDescriptorTable();
   }
 
@@ -153,6 +191,13 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
    */
   @Override
   protected void executePreDMLUpdates() throws AmbariException, SQLException {
+    // execute DDL updates
+    executeStackUpgradeDDLUpdates();
+
+    // DDL and DML mixed code, double check here
+    bootstrapRepoVersionForHDP21();
+
+    // execute DML updates, no DDL things after this line
     executeUpgradePreDMLUpdates();
   }
 
@@ -174,16 +219,18 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     for (UpgradeEntity upgrade: upgrades){
       if (upgrade.isDowngradeAllowed() == null) {
         upgrade.setDowngradeAllowed(true);
-        upgradeDAO.merge(upgrade);
       }
 
       // ensure that these are set to false for existing upgrades
       upgrade.setAutoSkipComponentFailures(false);
       upgrade.setAutoSkipServiceCheckFailures(false);
 
+      // apply changes
+      upgradeDAO.merge(upgrade);
+
       LOG.info(String.format("Updated upgrade id %s, upgrade pack %s from version %s to %s",
-          upgrade.getId(), upgrade.getUpgradePackage(), upgrade.getFromVersion(),
-          upgrade.getToVersion()));
+        upgrade.getId(), upgrade.getUpgradePackage(), upgrade.getFromVersion(),
+        upgrade.getToVersion()));
     }
 
     // make the columns nullable now that they have defaults
@@ -206,6 +253,331 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   }
 
   /**
+   * Move the upgrade_package column from the repo_version table to the upgrade table as follows,
+   * add column upgrade_package to upgrade table as String 255 and nullable
+   * populate column in the upgrade table
+   * drop the column in the repo_version table
+   * make the column in the upgrade table non-nullable.
+   * This has to be called as part of DML and not DDL since the persistence service has to be started.
+   * @throws AmbariException
+   * @throws SQLException
+   */
+  @Transactional
+  protected void executeStackUpgradeDDLUpdates() throws SQLException, AmbariException {
+    final Configuration.DatabaseType databaseType = configuration.getDatabaseType();
+
+    // Add columns
+    if (!dbAccessor.tableHasColumn(UPGRADE_TABLE, UPGRADE_PACKAGE_COL)) {
+      LOG.info("Adding upgrade_package column to upgrade table.");
+      dbAccessor.addColumn(UPGRADE_TABLE, new DBColumnInfo(UPGRADE_PACKAGE_COL, String.class, 255, null, true));
+    }
+    if (!dbAccessor.tableHasColumn(UPGRADE_TABLE, UPGRADE_TYPE_COL)) {
+      LOG.info("Adding upgrade_type column to upgrade table.");
+      dbAccessor.addColumn(UPGRADE_TABLE, new DBColumnInfo(UPGRADE_TYPE_COL, String.class, 32, null, true));
+    }
+
+    // Populate values in upgrade table.
+    boolean success = populateUpgradeTable();
+
+    if (!success) {
+      throw new AmbariException("Errors found while populating the upgrade table with values for columns upgrade_type and upgrade_package.");
+    }
+
+    if (dbAccessor.tableHasColumn(REPO_VERSION_TABLE, UPGRADE_PACKAGE_COL)) {
+      LOG.info("Dropping upgrade_package column from repo_version table.");
+      dbAccessor.dropColumn(REPO_VERSION_TABLE, UPGRADE_PACKAGE_COL);
+
+      // Now, make the added column non-nullable
+      // Make the hosts id non-null after all the values are populated
+      LOG.info("Making upgrade_package column in the upgrade table non-nullable.");
+      if (databaseType == Configuration.DatabaseType.DERBY) {
+        // This is a workaround for UpgradeTest.java unit test
+        dbAccessor.executeQuery("ALTER TABLE " + UPGRADE_TABLE + " ALTER column " + UPGRADE_PACKAGE_COL + " NOT NULL");
+      } else {
+        dbAccessor.alterColumn(UPGRADE_TABLE, new DBColumnInfo(UPGRADE_PACKAGE_COL, String.class, 255, null, false));
+      }
+    }
+
+    if (dbAccessor.tableHasColumn(REPO_VERSION_TABLE, UPGRADE_TYPE_COL)) {
+      // Now, make the added column non-nullable
+      // Make the hosts id non-null after all the values are populated
+      LOG.info("Making upgrade_type column in the upgrade table non-nullable.");
+      if (databaseType == Configuration.DatabaseType.DERBY) {
+        // This is a workaround for UpgradeTest.java unit test
+        dbAccessor.executeQuery("ALTER TABLE " + UPGRADE_TABLE + " ALTER column " + UPGRADE_TYPE_COL + " NOT NULL");
+      } else {
+        dbAccessor.alterColumn(UPGRADE_TABLE, new DBColumnInfo(UPGRADE_TYPE_COL, String.class, 32, null, false));
+      }
+    }
+  }
+
+  /**
+   * Populate the upgrade table with values for the columns upgrade_type and upgrade_package.
+   * The upgrade_type will default to {@code org.apache.ambari.server.state.stack.upgrade.UpgradeType.ROLLING}
+   * whereas the upgrade_package will be calculated.
+   * @return {@code} true on success, and {@code} false otherwise.
+   */
+  private boolean populateUpgradeTable() {
+    boolean success = true;
+    Statement statement = null;
+    ResultSet rs = null;
+    try {
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        // Need to use SQL since the schema is changing and some of the columns have not yet been added..
+        rs = statement.executeQuery("SELECT upgrade_id, cluster_id, from_version, to_version, direction, upgrade_package, upgrade_type FROM upgrade");
+        if (rs != null) {
+          try {
+            while (rs.next()) {
+              final long upgradeId = rs.getLong("upgrade_id");
+              final long clusterId = rs.getLong("cluster_id");
+              final String fromVersion = rs.getString("from_version");
+              final String toVersion = rs.getString("to_version");
+              final Direction direction = Direction.valueOf(rs.getString("direction"));
+              // These two values are likely null.
+              String upgradePackage = rs.getString("upgrade_package");
+              String upgradeType = rs.getString("upgrade_type");
+
+              LOG.info(MessageFormat.format("Populating rows for the upgrade table record with " +
+                  "upgrade_id: {0,number,#}, cluster_id: {1,number,#}, from_version: {2}, to_version: {3}, direction: {4}",
+                upgradeId, clusterId, fromVersion, toVersion, direction));
+
+              // Set all upgrades that have been done so far to type "rolling"
+              if (StringUtils.isEmpty(upgradeType)) {
+                LOG.info("Updating the record's upgrade_type to " + UpgradeType.ROLLING);
+                dbAccessor.executeQuery("UPDATE upgrade SET upgrade_type = '" + UpgradeType.ROLLING + "' WHERE upgrade_id = " + upgradeId);
+              }
+
+              if (StringUtils.isEmpty(upgradePackage)) {
+                String version = null;
+                StackEntity stack = null;
+
+                if (direction == Direction.UPGRADE) {
+                  version = toVersion;
+                } else if (direction == Direction.DOWNGRADE) {
+                  // TODO AMBARI-12698, this is going to be a problem.
+                  // During a downgrade, the "to_version" is overwritten to the source version, but the "from_version"
+                  // doesn't swap. E.g.,
+                  //  upgrade_id | from_version |  to_version  | direction
+                  // ------------+--------------+--------------+----------
+                  //           1 | 2.2.6.0-2800 | 2.3.0.0-2557 | UPGRADE
+                  //           2 | 2.2.6.0-2800 | 2.2.6.0-2800 | DOWNGRADE
+                  version = fromVersion;
+                }
+
+                ClusterEntity cluster = clusterDAO.findById(clusterId);
+
+                if (null != cluster) {
+                  stack = cluster.getDesiredStack();
+                  upgradePackage = calculateUpgradePackage(stack, version);
+                } else {
+                  LOG.error("Could not find a cluster with cluster_id " + clusterId);
+                }
+
+                if (!StringUtils.isEmpty(upgradePackage)) {
+                  LOG.info("Updating the record's upgrade_package to " + upgradePackage);
+                  dbAccessor.executeQuery("UPDATE upgrade SET upgrade_package = '" + upgradePackage + "' WHERE upgrade_id = " + upgradeId);
+                } else {
+                  success = false;
+                  LOG.error("Unable to populate column upgrade_package for record in table upgrade with id " + upgradeId);
+                }
+              }
+            }
+          } catch (Exception e) {
+            success = false;
+            e.printStackTrace();
+            LOG.error("Unable to populate the upgrade_type and upgrade_package columns of the upgrade table. " + e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      success = false;
+      e.printStackTrace();
+      LOG.error("Failed to retrieve records from the upgrade table to populate the upgrade_type and upgrade_package columns. Exception: " + e);
+    } finally {
+      try {
+        if (rs != null) {
+          rs.close();
+        }
+        if (statement != null) {
+          statement.close();
+        }
+      } catch (SQLException e) {
+        ;
+      }
+    }
+    return success;
+  }
+
+  /**
+   * Find the single Repo Version for the given stack and version, and return its upgrade_package column.
+   * Because the upgrade_package column is going to be removed from this entity, must use raw SQL
+   * instead of the entity class.
+   * @param stack Stack
+   * @param version Stack version
+   * @return The value of the upgrade_package column, or null if not found.
+   */
+
+  private String calculateUpgradePackage(StackEntity stack, String version) {
+    String upgradePackage = null;
+    // Find the corresponding repo_version, and extract its upgrade_package
+    if (null != version && null != stack) {
+      RepositoryVersionEntity repoVersion = repositoryVersionDAO.findByStackNameAndVersion(stack.getStackName(), version);
+
+      Statement statement = null;
+      ResultSet rs = null;
+      try {
+        statement = dbAccessor.getConnection().createStatement();
+        if (statement != null) {
+          // Need to use SQL since the schema is changing and the entity will no longer have the upgrade_package column.
+          rs = statement.executeQuery("SELECT upgrade_package FROM repo_version WHERE repo_version_id = " + repoVersion.getId());
+          if (rs != null && rs.next()) {
+            upgradePackage = rs.getString("upgrade_package");
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to retrieve upgrade_package for repo_version record with id " + repoVersion.getId() + ". Exception: " + e.getMessage());
+      } finally {
+        try {
+          if (rs != null) {
+            rs.close();
+          }
+          if (statement != null) {
+            statement.close();
+          }
+        } catch (SQLException e) {
+          ;
+        }
+      }
+    }
+    return upgradePackage;
+  }
+
+  /**
+   * If still on HDP 2.1, then no repo versions exist, so need to bootstrap the HDP 2.1 repo version,
+   * and mark it as CURRENT in the cluster_version table for the cluster, as well as the host_version table
+   * for all hosts.
+   */
+  @Transactional
+  public void bootstrapRepoVersionForHDP21() throws AmbariException, SQLException {
+    final String hardcodedInitialVersion = "2.1.0.0-0001";
+    AmbariManagementController amc = injector.getInstance(AmbariManagementController.class);
+    AmbariMetaInfo ambariMetaInfo = amc.getAmbariMetaInfo();
+    StackDAO stackDAO = injector.getInstance(StackDAO.class);
+    RepositoryVersionHelper repositoryVersionHelper = injector.getInstance(RepositoryVersionHelper.class);
+    RepositoryVersionDAO repositoryVersionDAO = injector.getInstance(RepositoryVersionDAO.class);
+    ClusterVersionDAO clusterVersionDAO = injector.getInstance(ClusterVersionDAO.class);
+    HostVersionDAO hostVersionDAO = injector.getInstance(HostVersionDAO.class);
+
+    Clusters clusters = amc.getClusters();
+    if (clusters == null) {
+      LOG.error("Unable to get Clusters entity.");
+      return;
+    }
+
+    for (Cluster cluster : clusters.getClusters().values()) {
+      ClusterEntity clusterEntity = clusterDAO.findByName(cluster.getClusterName());
+      final StackId stackId = cluster.getCurrentStackVersion();
+      LOG.info(MessageFormat.format("Analyzing cluster {0}, currently at stack {1} and version {2}",
+        cluster.getClusterName(), stackId.getStackName(), stackId.getStackVersion()));
+
+      if (stackId.getStackName().equalsIgnoreCase("HDP") && stackId.getStackVersion().equalsIgnoreCase("2.1")) {
+        final StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
+        StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
+
+        LOG.info("Bootstrapping the versions since using HDP-2.1");
+
+        // The actual value is not known, so use this.
+        String displayName = stackId.getStackName() + "-" + hardcodedInitialVersion;
+
+        // However, the Repo URLs should be correct.
+        String operatingSystems = repositoryVersionHelper.serializeOperatingSystems(stackInfo.getRepositories());
+
+        // Create the Repo Version if it doesn't already exist.
+        RepositoryVersionEntity repoVersionEntity = repositoryVersionDAO.findByDisplayName(displayName);
+        if (null != repoVersionEntity) {
+          LOG.info(MessageFormat.format("A Repo Version already exists with Display Name: {0}", displayName));
+        } else {
+          final long repoVersionIdSeq = repositoryVersionDAO.findMaxId("id");
+          // Safe to attempt to add the sequence if it doesn't exist already.
+          addSequence("repo_version_id_seq", repoVersionIdSeq, false);
+
+          repoVersionEntity = repositoryVersionDAO.create(
+            stackEntity, hardcodedInitialVersion, displayName, operatingSystems);
+          LOG.info(MessageFormat.format("Created Repo Version with ID: {0,number,#}\n, Display Name: {1}, Repo URLs: {2}\n",
+            repoVersionEntity.getId(), displayName, operatingSystems));
+        }
+
+        // Create the Cluster Version if it doesn't already exist.
+        ClusterVersionEntity clusterVersionEntity = clusterVersionDAO.findByClusterAndStackAndVersion(cluster.getClusterName(),
+          stackId, hardcodedInitialVersion);
+
+        if (null != clusterVersionEntity) {
+          LOG.info(MessageFormat.format("A Cluster Version version for cluster: {0}, version: {1}, already exists; its state is {2}.",
+            cluster.getClusterName(), clusterVersionEntity.getRepositoryVersion().getVersion(), clusterVersionEntity.getState()));
+
+          // If there are not CURRENT cluster versions, make this one the CURRENT one.
+          if (clusterVersionEntity.getState() != RepositoryVersionState.CURRENT &&
+            clusterVersionDAO.findByClusterAndState(cluster.getClusterName(), RepositoryVersionState.CURRENT).isEmpty()) {
+            clusterVersionEntity.setState(RepositoryVersionState.CURRENT);
+            clusterVersionDAO.merge(clusterVersionEntity);
+          }
+        } else {
+          final long clusterVersionIdSeq = clusterVersionDAO.findMaxId("id");
+          // Safe to attempt to add the sequence if it doesn't exist already.
+          addSequence("cluster_version_id_seq", clusterVersionIdSeq, false);
+
+          clusterVersionEntity = clusterVersionDAO.create(clusterEntity, repoVersionEntity, RepositoryVersionState.CURRENT,
+            System.currentTimeMillis(), System.currentTimeMillis(), "admin");
+          LOG.info(MessageFormat.format("Created Cluster Version with ID: {0,number,#}, cluster: {1}, version: {2}, state: {3}.",
+            clusterVersionEntity.getId(), cluster.getClusterName(), clusterVersionEntity.getRepositoryVersion().getVersion(),
+            clusterVersionEntity.getState()));
+        }
+
+        // Create the Host Versions if they don't already exist.
+        Collection<HostEntity> hosts = clusterEntity.getHostEntities();
+        boolean addedAtLeastOneHost = false;
+        if (null != hosts && !hosts.isEmpty()) {
+          for (HostEntity hostEntity : hosts) {
+            HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(cluster.getClusterName(),
+              stackId, hardcodedInitialVersion, hostEntity.getHostName());
+
+            if (null != hostVersionEntity) {
+              LOG.info(MessageFormat.format("A Host Version version for cluster: {0}, version: {1}, host: {2}, already exists; its state is {3}.",
+                cluster.getClusterName(), hostVersionEntity.getRepositoryVersion().getVersion(),
+                hostEntity.getHostName(), hostVersionEntity.getState()));
+
+              if (hostVersionEntity.getState() != RepositoryVersionState.CURRENT &&
+                hostVersionDAO.findByClusterHostAndState(cluster.getClusterName(), hostEntity.getHostName(),
+                  RepositoryVersionState.CURRENT).isEmpty()) {
+                hostVersionEntity.setState(RepositoryVersionState.CURRENT);
+                hostVersionDAO.merge(hostVersionEntity);
+              }
+            } else {
+              // This should only be done the first time.
+              if (!addedAtLeastOneHost) {
+                final long hostVersionIdSeq = hostVersionDAO.findMaxId("id");
+                // Safe to attempt to add the sequence if it doesn't exist already.
+                addSequence("host_version_id_seq", hostVersionIdSeq, false);
+                addedAtLeastOneHost = true;
+              }
+
+              hostVersionEntity = new HostVersionEntity(hostEntity, repoVersionEntity, RepositoryVersionState.CURRENT);
+              hostVersionDAO.create(hostVersionEntity);
+              LOG.info(MessageFormat.format("Created Host Version with ID: {0,number,#}, cluster: {1}, version: {2}, host: {3}, state: {4}.",
+                hostVersionEntity.getId(), cluster.getClusterName(), hostVersionEntity.getRepositoryVersion().getVersion(),
+                hostEntity.getHostName(), hostVersionEntity.getState()));
+            }
+          }
+        } else {
+          LOG.info(MessageFormat.format("Not inserting any Host Version records since cluster {0} does not have any hosts.",
+            cluster.getClusterName()));
+        }
+      }
+    }
+  }
+
+  /**
    * Adds the following columns to the {@value #UPGRADE_TABLE} table:
    * <ul>
    * <li>{@value #DOWNGRADE_ALLOWED_COLUMN}</li>
@@ -234,7 +606,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
     for (final Cluster cluster : clusterMap.values()) {
       final AlertDefinitionEntity alertDefinitionEntity = alertDefinitionDAO.findByName(
-          cluster.getClusterId(), "journalnode_process");
+        cluster.getClusterId(), "journalnode_process");
 
       if (alertDefinitionEntity != null) {
         String source = alertDefinitionEntity.getSource();
@@ -275,16 +647,16 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
     rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").remove("text");
     rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").addProperty(
-            "text", "HTTP {0} response in {2:.3f}s");
+      "text", "HTTP {0} response in {2:.3f}s");
 
     rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("text");
     rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").addProperty(
-            "text", "HTTP {0} response from {1} in {2:.3f}s ({3})");
+      "text", "HTTP {0} response from {1} in {2:.3f}s ({3})");
     rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("value");
 
     rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("text");
     rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").addProperty("text",
-            "Connection failed to {1} ({3})");
+      "Connection failed to {1} ({3})");
     rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("value");
 
     return rootJson.toString();
@@ -308,7 +680,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
   protected void updateHDFSConfigs() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(
-            AmbariManagementController.class);
+      AmbariManagementController.class);
     Map<String, Cluster> clusterMap = getCheckedClusterMap(ambariManagementController.getClusters());
 
     for (final Cluster cluster : clusterMap.values()) {
@@ -357,7 +729,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     for (final Cluster cluster : getCheckedClusterMap(ambariManagementController.getClusters()).values()) {
       StackId stackId = cluster.getCurrentStackVersion();
       if (stackId != null && stackId.getStackName().equals("HDP") &&
-               VersionUtils.compareVersions(stackId.getStackVersion(), "2.2") >= 0) {
+        VersionUtils.compareVersions(stackId.getStackVersion(), "2.2") >= 0) {
         Config hbaseEnvConfig = cluster.getDesiredConfigByType(HBASE_ENV_CONFIG);
         if (hbaseEnvConfig != null) {
           String content = hbaseEnvConfig.getProperties().get(CONTENT_PROPERTY);

http://git-wip-us.apache.org/repos/asf/ambari/blob/d30b5f03/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index 69e1287..3918ec6 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.upgrade;
 
+import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMockBuilder;
@@ -32,6 +33,10 @@ import static org.easymock.EasyMock.verify;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,16 +50,29 @@ import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.stack.StackManagerFactory;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -71,6 +89,7 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.persist.PersistService;
+import java.lang.reflect.Field;
 
 /**
  * {@link org.apache.ambari.server.upgrade.UpgradeCatalog213} unit tests.
@@ -81,6 +100,13 @@ public class UpgradeCatalog213Test {
   private EntityManager entityManager = createNiceMock(EntityManager.class);
   private UpgradeCatalogHelper upgradeCatalogHelper;
   private StackEntity desiredStackEntity;
+  private AmbariManagementController amc = createNiceMock(AmbariManagementController.class);
+  private AmbariMetaInfo metaInfo = createNiceMock(AmbariMetaInfo.class);
+  private StackDAO stackDAO = createNiceMock(StackDAO.class);
+  private RepositoryVersionDAO repositoryVersionDAO = createNiceMock(RepositoryVersionDAO.class);
+  private ClusterVersionDAO clusterVersionDAO = createNiceMock(ClusterVersionDAO.class);
+  private HostVersionDAO hostVersionDAO = createNiceMock(HostVersionDAO.class);
+  private ClusterDAO clusterDAO = createNiceMock(ClusterDAO.class);
 
   private IMocksControl mocksControl = EasyMock.createControl();
 
@@ -118,6 +144,7 @@ public class UpgradeCatalog213Test {
       public void configure(Binder binder) {
         binder.bind(DBAccessor.class).toInstance(dbAccessor);
         binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        binder.bind(EntityManager.class).toInstance(entityManager);
       }
     };
 
@@ -129,10 +156,46 @@ public class UpgradeCatalog213Test {
 
   @Test
   public void testExecuteDMLUpdates() throws Exception {
+    // TODO AMBARI-13001, readd unit test section.
+    /*
+    final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
+    Configuration configuration = createNiceMock(Configuration.class);
+    Connection connection = createNiceMock(Connection.class);
+    Statement statement = createNiceMock(Statement.class);
+    ResultSet resultSet = createNiceMock(ResultSet.class);
+    expect(configuration.getDatabaseUrl()).andReturn(Configuration.JDBC_IN_MEMORY_URL).anyTimes();
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery(anyObject(String.class));
+    expectLastCall().andReturn(resultSet).anyTimes();
+
+    // Technically, this is a DDL, but it has to be ran during the DML portion
+    // because it requires the persistence layer to be started.
+    UpgradeSectionDDL upgradeSectionDDL = new UpgradeSectionDDL();
+
+    // Execute any DDL schema changes
+    upgradeSectionDDL.execute(dbAccessor);
+
+    // Begin DML verifications
+    verifyBootstrapHDP21();
+
+    // Replay main sections
+    replay(dbAccessor, configuration, resultSet, connection, statement);
+
+
+    AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
+    Class<?> c = AbstractUpgradeCatalog.class;
+    Field f = c.getDeclaredField("configuration");
+    f.setAccessible(true);
+    f.set(upgradeCatalog, configuration);
+    */
+
+    Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
     Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
     Method updateHDFSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateHDFSConfigs");
     Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs");
-    Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
     Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
     Method updateHbaseEnvConfig = UpgradeCatalog213.class.getDeclaredMethod("updateHbaseEnvConfig");
     Method updateZookeeperLog4j = UpgradeCatalog213.class.getDeclaredMethod("updateZookeeperLog4j");
@@ -140,16 +203,16 @@ public class UpgradeCatalog213Test {
     Method updateAlertDefinitions = UpgradeCatalog213.class.getDeclaredMethod("updateAlertDefinitions");
 
     UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
-        .addMockedMethod(updateAMSConfigs)
-        .addMockedMethod(updateHDFSConfigs)
-        .addMockedMethod(updateStormConfigs)
-        .addMockedMethod(addNewConfigurationsFromXml)
-        .addMockedMethod(updateHbaseEnvConfig)
-        .addMockedMethod(updateAlertDefinitions)
-        .addMockedMethod(updateKafkaConfigs)
-        .addMockedMethod(updateZookeeperLog4j)
-        .addMockedMethod(updateHadoopEnvConfig)
-        .createMock();
+      .addMockedMethod(updateAMSConfigs)
+      .addMockedMethod(updateHDFSConfigs)
+      .addMockedMethod(updateStormConfigs)
+      .addMockedMethod(addNewConfigurationsFromXml)
+      .addMockedMethod(updateHbaseEnvConfig)
+      .addMockedMethod(updateAlertDefinitions)
+      .addMockedMethod(updateKafkaConfigs)
+      .addMockedMethod(updateZookeeperLog4j)
+      .addMockedMethod(updateHadoopEnvConfig)
+      .createMock();
 
     upgradeCatalog213.updateHbaseEnvConfig();
     expectLastCall().once();
@@ -177,11 +240,98 @@ public class UpgradeCatalog213Test {
     verify(upgradeCatalog213);
   }
 
+  /**
+   * Verify that when bootstrapping HDP 2.1, records get inserted into the
+   * repo_version, cluster_version, and host_version tables.
+   * @throws AmbariException
+   */
+  private void verifyBootstrapHDP21() throws Exception, AmbariException {
+    final String stackName = "HDP";
+    final String stackVersion = "2.1";
+    final String stackNameAndVersion = stackName + "-" + stackVersion;
+    final String buildNumber = "2.1.0.0-0001";
+    final String stackAndBuild = stackName + "-" + buildNumber;
+    final String clusterName = "c1";
+
+    expect(amc.getAmbariMetaInfo()).andReturn(metaInfo);
+
+    // Mock the actions to bootstrap if using HDP 2.1
+    Clusters clusters = createNiceMock(Clusters.class);
+    expect(amc.getClusters()).andReturn(clusters);
+
+    Map<String, Cluster> clusterHashMap = new HashMap<String, Cluster>();
+    Cluster cluster = createNiceMock(Cluster.class);
+    clusterHashMap.put(clusterName, cluster);
+    expect(clusters.getClusters()).andReturn(clusterHashMap);
+
+    StackId stackId = new StackId(stackNameAndVersion);
+    expect(cluster.getCurrentStackVersion()).andReturn(stackId);
+
+    StackInfo stackInfo = new StackInfo();
+    stackInfo.setVersion(buildNumber);
+    expect(metaInfo.getStack(stackName, stackVersion)).andReturn(stackInfo);
+
+    StackEntity stackEntity = createNiceMock(StackEntity.class);
+    expect(stackEntity.getStackName()).andReturn(stackName);
+    expect(stackEntity.getStackVersion()).andReturn(stackVersion);
+
+    expect(stackDAO.find(stackName, stackVersion)).andReturn(stackEntity);
+
+    replay(amc, metaInfo, clusters, cluster, stackEntity, stackDAO);
+
+    // Mock more function calls
+    // Repository Version
+    RepositoryVersionEntity repositoryVersionEntity = createNiceMock(RepositoryVersionEntity.class);
+    expect(repositoryVersionDAO.findByDisplayName(stackAndBuild)).andReturn(null);
+    expect(repositoryVersionDAO.findMaxId("id")).andReturn(0L);
+    expect(repositoryVersionDAO.findAll()).andReturn(Collections.<RepositoryVersionEntity>emptyList());
+    expect(repositoryVersionDAO.create(anyObject(StackEntity.class), anyObject(String.class), anyObject(String.class), anyObject(String.class))).andReturn(repositoryVersionEntity);
+    expect(repositoryVersionEntity.getId()).andReturn(1L);
+    expect(repositoryVersionEntity.getVersion()).andReturn(buildNumber);
+    replay(repositoryVersionDAO, repositoryVersionEntity);
+
+    // Cluster Version
+    ClusterVersionEntity clusterVersionEntity = createNiceMock(ClusterVersionEntity.class);
+    expect(clusterVersionEntity.getId()).andReturn(1L);
+    expect(clusterVersionEntity.getState()).andReturn(RepositoryVersionState.CURRENT);
+    expect(clusterVersionEntity.getRepositoryVersion()).andReturn(repositoryVersionEntity);
+
+    expect(clusterVersionDAO.findByClusterAndStackAndVersion(anyObject(String.class), anyObject(StackId.class), anyObject(String.class))).andReturn(null);
+    expect(clusterVersionDAO.findMaxId("id")).andReturn(0L);
+    expect(clusterVersionDAO.findAll()).andReturn(Collections.<ClusterVersionEntity>emptyList());
+    expect(clusterVersionDAO.create(anyObject(ClusterEntity.class), anyObject(RepositoryVersionEntity.class), anyObject(RepositoryVersionState.class), anyLong(), anyLong(), anyObject(String.class))).andReturn(clusterVersionEntity);
+    replay(clusterVersionDAO, clusterVersionEntity);
+
+    // Host Version
+    ClusterEntity clusterEntity = createNiceMock(ClusterEntity.class);
+    expect(clusterEntity.getClusterName()).andReturn(clusterName).anyTimes();
+    expect(clusterDAO.findByName(anyObject(String.class))).andReturn(clusterEntity);
+
+    Collection<HostEntity> hostEntities = new ArrayList<HostEntity>();
+    HostEntity hostEntity1 = createNiceMock(HostEntity.class);
+    HostEntity hostEntity2 = createNiceMock(HostEntity.class);
+    expect(hostEntity1.getHostName()).andReturn("host1");
+    expect(hostEntity2.getHostName()).andReturn("host2");
+    hostEntities.add(hostEntity1);
+    hostEntities.add(hostEntity2);
+    expect(clusterEntity.getHostEntities()).andReturn(hostEntities);
+
+    expect(hostVersionDAO.findByClusterStackVersionAndHost(anyObject(String.class), anyObject(StackId.class), anyObject(String.class), anyObject(String.class))).andReturn(null);
+    expect(hostVersionDAO.findMaxId("id")).andReturn(0L);
+    expect(hostVersionDAO.findAll()).andReturn(Collections.<HostVersionEntity>emptyList());
+
+    replay(clusterEntity, clusterDAO, hostVersionDAO, hostEntity1, hostEntity2);
+  }
+
   @Test
   public void testExecuteUpgradePreDMLUpdates() throws Exception {
     Method executeStackPreDMLUpdates = UpgradeCatalog213.class.getDeclaredMethod("executeUpgradePreDMLUpdates");
+    Method executeStackUpgradeDDLUpdates = UpgradeCatalog213.class.getDeclaredMethod("executeStackUpgradeDDLUpdates");
+    Method bootstrapRepoVersionForHDP21 = UpgradeCatalog213.class.getDeclaredMethod("bootstrapRepoVersionForHDP21");
 
     final UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
+      .addMockedMethod(executeStackUpgradeDDLUpdates)
+      .addMockedMethod(bootstrapRepoVersionForHDP21)
       .addMockedMethod(executeStackPreDMLUpdates).createMock();
 
     final Injector mockInjector = Guice.createInjector(new AbstractModule() {
@@ -190,12 +340,19 @@ public class UpgradeCatalog213Test {
         bind(UpgradeCatalog213.class).toInstance(upgradeCatalog213);
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        bind(EntityManager.class).toInstance(entityManager);
       }
     });
 
     upgradeCatalog213.executeUpgradePreDMLUpdates();
     expectLastCall().once();
 
+    upgradeCatalog213.executeStackUpgradeDDLUpdates();
+    expectLastCall().once();
+
+    upgradeCatalog213.bootstrapRepoVersionForHDP21();
+    expectLastCall().once();
+
     replay(upgradeCatalog213);
     mockInjector.getInstance(UpgradeCatalog213.class).executePreDMLUpdates();
 
@@ -225,6 +382,7 @@ public class UpgradeCatalog213Test {
         bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
         bind(ConfigHelper.class).toInstance(mockConfigHelper);
         bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
 
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
@@ -264,6 +422,7 @@ public class UpgradeCatalog213Test {
       protected void configure() {
         bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
         bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
 
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
@@ -307,6 +466,7 @@ public class UpgradeCatalog213Test {
         bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
         bind(ConfigHelper.class).toInstance(mockConfigHelper);
         bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
 
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
@@ -332,29 +492,29 @@ public class UpgradeCatalog213Test {
     Method updateAmsHbaseEnvContent = UpgradeCatalog213.class.getDeclaredMethod("updateAmsHbaseEnvContent", String.class);
     UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
     String oldContent = "export HBASE_CLASSPATH=${HBASE_CLASSPATH}\n" +
-        "\n" +
-        "# The maximum amount of heap to use, in MB. Default is 1000.\n" +
-        "export HBASE_HEAPSIZE={{hbase_heapsize}}\n" +
-        "\n" +
-        "{% if java_version &lt; 8 %}\n" +
-        "export HBASE_MASTER_OPTS=\" -XX:PermSize=64m -XX:MaxPermSize={{hbase_master_maxperm_size}} -Xms{{hbase_heapsize}} -Xmx{{hbase_heapsize}} -Xmn{{hbase_master_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
-        "export HBASE_REGIONSERVER_OPTS=\"-XX:MaxPermSize=128m -Xmn{{regionserver_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}} -Xmx{{regionserver_heapsize}}\"\n" +
-        "{% else %}\n" +
-        "export HBASE_MASTER_OPTS=\" -Xms{{hbase_heapsize}} -Xmx{{hbase_heapsize}} -Xmn{{hbase_master_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
-        "export HBASE_REGIONSERVER_OPTS=\" -Xmn{{regionserver_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}} -Xmx{{regionserver_heapsize}}\"\n" +
-        "{% endif %}\n";
+      "\n" +
+      "# The maximum amount of heap to use, in MB. Default is 1000.\n" +
+      "export HBASE_HEAPSIZE={{hbase_heapsize}}\n" +
+      "\n" +
+      "{% if java_version &lt; 8 %}\n" +
+      "export HBASE_MASTER_OPTS=\" -XX:PermSize=64m -XX:MaxPermSize={{hbase_master_maxperm_size}} -Xms{{hbase_heapsize}} -Xmx{{hbase_heapsize}} -Xmn{{hbase_master_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
+      "export HBASE_REGIONSERVER_OPTS=\"-XX:MaxPermSize=128m -Xmn{{regionserver_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}} -Xmx{{regionserver_heapsize}}\"\n" +
+      "{% else %}\n" +
+      "export HBASE_MASTER_OPTS=\" -Xms{{hbase_heapsize}} -Xmx{{hbase_heapsize}} -Xmn{{hbase_master_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
+      "export HBASE_REGIONSERVER_OPTS=\" -Xmn{{regionserver_xmn_size}} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}} -Xmx{{regionserver_heapsize}}\"\n" +
+      "{% endif %}\n";
     String expectedContent = "export HBASE_CLASSPATH=${HBASE_CLASSPATH}\n" +
-        "\n" +
-        "# The maximum amount of heap to use, in MB. Default is 1000.\n" +
-        "export HBASE_HEAPSIZE={{hbase_heapsize}}m\n" +
-        "\n" +
-        "{% if java_version &lt; 8 %}\n" +
-        "export HBASE_MASTER_OPTS=\" -XX:PermSize=64m -XX:MaxPermSize={{hbase_master_maxperm_size}}m -Xms{{hbase_heapsize}}m -Xmx{{hbase_heapsize}}m -Xmn{{hbase_master_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
-        "export HBASE_REGIONSERVER_OPTS=\"-XX:MaxPermSize=128m -Xmn{{regionserver_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}}m -Xmx{{regionserver_heapsize}}m\"\n" +
-        "{% else %}\n" +
-        "export HBASE_MASTER_OPTS=\" -Xms{{hbase_heapsize}}m -Xmx{{hbase_heapsize}}m -Xmn{{hbase_master_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
-        "export HBASE_REGIONSERVER_OPTS=\" -Xmn{{regionserver_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}}m -Xmx{{regionserver_heapsize}}m\"\n" +
-        "{% endif %}\n";
+      "\n" +
+      "# The maximum amount of heap to use, in MB. Default is 1000.\n" +
+      "export HBASE_HEAPSIZE={{hbase_heapsize}}m\n" +
+      "\n" +
+      "{% if java_version &lt; 8 %}\n" +
+      "export HBASE_MASTER_OPTS=\" -XX:PermSize=64m -XX:MaxPermSize={{hbase_master_maxperm_size}}m -Xms{{hbase_heapsize}}m -Xmx{{hbase_heapsize}}m -Xmn{{hbase_master_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
+      "export HBASE_REGIONSERVER_OPTS=\"-XX:MaxPermSize=128m -Xmn{{regionserver_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}}m -Xmx{{regionserver_heapsize}}m\"\n" +
+      "{% else %}\n" +
+      "export HBASE_MASTER_OPTS=\" -Xms{{hbase_heapsize}}m -Xmx{{hbase_heapsize}}m -Xmn{{hbase_master_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly\"\n" +
+      "export HBASE_REGIONSERVER_OPTS=\" -Xmn{{regionserver_xmn_size}}m -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms{{regionserver_heapsize}}m -Xmx{{regionserver_heapsize}}m\"\n" +
+      "{% endif %}\n";
     String result = (String) updateAmsHbaseEnvContent.invoke(upgradeCatalog213, oldContent);
     Assert.assertEquals(expectedContent, result);
   }
@@ -364,9 +524,9 @@ public class UpgradeCatalog213Test {
     Method updateAmsEnvContent = UpgradeCatalog213.class.getDeclaredMethod("updateAmsEnvContent", String.class);
     UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
     String oldContent = "# AMS Collector heapsize\n" +
-        "export AMS_COLLECTOR_HEAPSIZE={{metrics_collector_heapsize}}\n";
+      "export AMS_COLLECTOR_HEAPSIZE={{metrics_collector_heapsize}}\n";
     String expectedContent = "# AMS Collector heapsize\n" +
-        "export AMS_COLLECTOR_HEAPSIZE={{metrics_collector_heapsize}}m\n";
+      "export AMS_COLLECTOR_HEAPSIZE={{metrics_collector_heapsize}}m\n";
     String result = (String) updateAmsEnvContent.invoke(upgradeCatalog213, oldContent);
     Assert.assertEquals(expectedContent, result);
   }
@@ -403,6 +563,7 @@ public class UpgradeCatalog213Test {
         bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
         bind(ConfigHelper.class).toInstance(mockConfigHelper);
         bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
 
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
@@ -428,18 +589,18 @@ public class UpgradeCatalog213Test {
   public void testModifyJournalnodeProcessAlertSource() throws Exception {
     UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
     String alertSource = "{\"uri\":\"{{hdfs-site/dfs.journalnode.http-address}}\",\"default_port\":8480," +
-        "\"type\":\"PORT\",\"reporting\":{\"ok\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\"}," +
-        "\"warning\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\",\"value\":1.5}," +
-        "\"critical\":{\"text\":\"Connection failed: {0} to {1}:{2}\",\"value\":5.0}}}";
+      "\"type\":\"PORT\",\"reporting\":{\"ok\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\"}," +
+      "\"warning\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\",\"value\":1.5}," +
+      "\"critical\":{\"text\":\"Connection failed: {0} to {1}:{2}\",\"value\":5.0}}}";
     String expected = "{\"reporting\":{\"ok\":{\"text\":\"HTTP {0} response in {2:.3f}s\"}," +
-        "\"warning\":{\"text\":\"HTTP {0} response from {1} in {2:.3f}s ({3})\"}," +
-        "\"critical\":{\"text\":\"Connection failed to {1} ({3})\"}},\"type\":\"WEB\"," +
-        "\"uri\":{\"http\":\"{{hdfs-site/dfs.journalnode.http-address}}\"," +
-        "\"https\":\"{{hdfs-site/dfs.journalnode.https-address}}\"," +
-        "\"kerberos_keytab\":\"{{hdfs-site/dfs.web.authentication.kerberos.keytab}}\"," +
-        "\"kerberos_principal\":\"{{hdfs-site/dfs.web.authentication.kerberos.principal}}\"," +
-        "\"https_property\":\"{{hdfs-site/dfs.http.policy}}\"," +
-        "\"https_property_value\":\"HTTPS_ONLY\",\"connection_timeout\":5.0}}";
+      "\"warning\":{\"text\":\"HTTP {0} response from {1} in {2:.3f}s ({3})\"}," +
+      "\"critical\":{\"text\":\"Connection failed to {1} ({3})\"}},\"type\":\"WEB\"," +
+      "\"uri\":{\"http\":\"{{hdfs-site/dfs.journalnode.http-address}}\"," +
+      "\"https\":\"{{hdfs-site/dfs.journalnode.https-address}}\"," +
+      "\"kerberos_keytab\":\"{{hdfs-site/dfs.web.authentication.kerberos.keytab}}\"," +
+      "\"kerberos_principal\":\"{{hdfs-site/dfs.web.authentication.kerberos.principal}}\"," +
+      "\"https_property\":\"{{hdfs-site/dfs.http.policy}}\"," +
+      "\"https_property_value\":\"HTTPS_ONLY\",\"connection_timeout\":5.0}}";
     Assert.assertEquals(expected, upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource));
   }
 
@@ -471,6 +632,7 @@ public class UpgradeCatalog213Test {
         bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
         bind(ConfigHelper.class).toInstance(mockConfigHelper);
         bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
 
         bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
         bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
@@ -502,6 +664,17 @@ public class UpgradeCatalog213Test {
         binder.bind(DBAccessor.class).toInstance(dbAccessor);
         binder.bind(EntityManager.class).toInstance(entityManager);
         binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        binder.bind(DaoUtils.class).toInstance(createNiceMock(DaoUtils.class));
+        binder.bind(ClusterDAO.class).toInstance(clusterDAO);
+        binder.bind(RepositoryVersionHelper.class).toInstance(createNiceMock(RepositoryVersionHelper.class));
+        binder.bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+        binder.bind(AmbariManagementController.class).toInstance(amc);
+        binder.bind(AmbariMetaInfo.class).toInstance(metaInfo);
+        binder.bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class));
+        binder.bind(StackDAO.class).toInstance(stackDAO);
+        binder.bind(RepositoryVersionDAO.class).toInstance(repositoryVersionDAO);
+        binder.bind(ClusterVersionDAO.class).toInstance(clusterVersionDAO);
+        binder.bind(HostVersionDAO.class).toInstance(hostVersionDAO);
       }
     };
 
@@ -564,6 +737,43 @@ public class UpgradeCatalog213Test {
     Assert.assertEquals("2.1.3", upgradeCatalog.getTargetVersion());
   }
 
+  // *********** Inner Classes that represent sections of the DDL ***********
+  // ************************************************************************
+
+  /**
+   * Verify that the upgrade table has two columns added to it.
+   */
+  class UpgradeSectionDDL implements SectionDDL {
+
+    Capture<DBAccessor.DBColumnInfo> upgradeTablePackageNameColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
+    Capture<DBAccessor.DBColumnInfo> upgradeTableUpgradeTypeColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(DBAccessor dbAccessor) throws SQLException {
+      // Add columns
+      dbAccessor.addColumn(eq("upgrade"), capture(upgradeTablePackageNameColumnCapture));
+      dbAccessor.addColumn(eq("upgrade"), capture(upgradeTableUpgradeTypeColumnCapture));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void verify(DBAccessor dbAccessor) throws SQLException {
+      // Verification section
+      DBAccessor.DBColumnInfo packageNameCol = upgradeTablePackageNameColumnCapture.getValue();
+      Assert.assertEquals(String.class, packageNameCol.getType());
+      Assert.assertEquals("upgrade_package", packageNameCol.getName());
+
+      DBAccessor.DBColumnInfo upgradeTypeCol = upgradeTableUpgradeTypeColumnCapture.getValue();
+      Assert.assertEquals(String.class, upgradeTypeCol.getType());
+      Assert.assertEquals("upgrade_type", upgradeTypeCol.getName());
+    }
+  }
+
   @Test
   public void testShouldDDLsBeExecutedOnUpgrade() throws Exception {
     // GIVEN
@@ -577,6 +787,7 @@ public class UpgradeCatalog213Test {
     Capture<String> capturedPKColumn = EasyMock.newCapture();
     Capture<List<DBAccessor.DBColumnInfo>> capturedColumns = EasyMock.newCapture();
     Capture<DBAccessor.DBColumnInfo> capturedColumn = EasyMock.newCapture();
+    Capture<DBAccessor.DBColumnInfo> capturedHostRoleCommandColumn = EasyMock.newCapture();
 
     EasyMock.expect(mockedInjector.getInstance(DaoUtils.class)).andReturn(mockedDaoUtils);
     mockedInjector.injectMembers(anyObject(UpgradeCatalog.class));
@@ -595,6 +806,7 @@ public class UpgradeCatalog213Test {
 
     // addKerberosDescriptorTable
     mockedDbAccessor.createTable(capture(capturedTableName), capture(capturedColumns), capture(capturedPKColumn));
+    mockedDbAccessor.alterColumn(eq("host_role_command"), capture(capturedHostRoleCommandColumn));
 
     mocksControl.replay();