You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2017/04/12 13:31:11 UTC

ambari git commit: AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi)

Repository: ambari
Updated Branches:
  refs/heads/trunk 273653b5a -> 11ab63f7f


AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11ab63f7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11ab63f7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11ab63f7

Branch: refs/heads/trunk
Commit: 11ab63f7ffa99a7d381b86282846fa7c80ddc88e
Parents: 273653b
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Wed Apr 12 16:30:34 2017 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Wed Apr 12 16:30:34 2017 +0300

----------------------------------------------------------------------
 .../checks/DatabaseConsistencyCheckHelper.java  | 117 +++++++++
 .../apache/ambari/server/orm/DBAccessor.java    |   6 +
 .../ambari/server/orm/DBAccessorImpl.java       |   5 +
 .../server/orm/dao/HostRoleCommandDAO.java      |  16 ++
 .../ambari/server/orm/dao/RequestDAO.java       | 260 ++++++++++++++++++-
 .../server/orm/dao/TopologyHostTaskDAO.java     |  11 +
 .../orm/dao/TopologyLogicalRequestDAO.java      |  12 +
 .../server/orm/dao/TopologyLogicalTaskDAO.java  |  12 +
 .../orm/entities/ExecutionCommandEntity.java    |   5 +
 .../orm/entities/HostRoleCommandEntity.java     |  10 +-
 .../server/orm/entities/RequestEntity.java      |   6 +
 .../entities/RequestOperationLevelEntity.java   |   4 +-
 .../entities/RequestResourceFilterEntity.java   |   5 +
 .../orm/entities/RoleSuccessCriteriaEntity.java |   5 +
 .../ambari/server/orm/entities/StageEntity.java |   6 +-
 .../orm/entities/TopologyHostRequestEntity.java |   5 +
 .../orm/entities/TopologyHostTaskEntity.java    |  15 +-
 .../entities/TopologyLogicalRequestEntity.java  |   5 +
 .../orm/entities/TopologyLogicalTaskEntity.java |  30 ++-
 .../server/orm/entities/UpgradeEntity.java      |   2 +
 .../server/orm/entities/UpgradeItemEntity.java  |   5 +
 ambari-server/src/main/python/ambari-server.py  |  10 +-
 .../src/main/python/ambari_server/dbCleanup.py  |  37 +--
 .../DatabaseConsistencyCheckHelperTest.java     |  66 +++++
 24 files changed, 620 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index e7e9433..b2a03e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -46,9 +47,12 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.MetainfoDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
@@ -81,6 +85,9 @@ public class DatabaseConsistencyCheckHelper {
   private static Connection connection;
   private static AmbariMetaInfo ambariMetaInfo;
   private static DBAccessor dbAccessor;
+  private static HostRoleCommandDAO hostRoleCommandDAO;
+  private static ExecutionCommandDAO executionCommandDAO;
+  private static StageDAO stageDAO;
 
   private static DatabaseConsistencyCheckResult checkResult = DatabaseConsistencyCheckResult.DB_CHECK_SUCCESS;
 
@@ -174,6 +181,7 @@ public class DatabaseConsistencyCheckHelper {
       checkHostComponentStates();
       checkServiceConfigs();
       checkTopologyTables();
+      checkForLargeTables();
       LOG.info("******************************* Check database completed *******************************");
       return checkResult;
     }
@@ -223,6 +231,115 @@ public class DatabaseConsistencyCheckHelper {
   }
 
   /**
+   * This method checks if ambari database has tables with too big size (according to limit).
+   * First of all we are trying to get table size from schema information, but if it's not possible,
+   * we will get tables rows count and compare it with row count limit.
+   */
+  static void checkForLargeTables() {
+    LOG.info("Checking for tables with large physical size");
+
+    ensureConnection();
+
+    DBAccessor.DbType dbType = dbAccessor.getDbType();
+    String schemaName = dbAccessor.getDbSchema();
+
+    String GET_TABLE_SIZE_IN_BYTES_POSTGRESQL = "SELECT pg_total_relation_size('%s') \"Table Size\"";
+    String GET_TABLE_SIZE_IN_BYTES_MYSQL = "SELECT (data_length + index_length) \"Table Size\" FROM information_schema.TABLES WHERE table_schema = \"" + schemaName + "\" AND table_name =\"%s\"";
+    String GET_TABLE_SIZE_IN_BYTES_ORACLE = "SELECT bytes \"Table Size\" FROM user_segments WHERE segment_type='TABLE' AND segment_name='%s'";
+    String GET_ROW_COUNT_QUERY = "SELECT COUNT(*) FROM %s";
+
+    Map<DBAccessor.DbType, String> tableSizeQueryMap = new HashMap<>();
+    tableSizeQueryMap.put(DBAccessor.DbType.POSTGRES, GET_TABLE_SIZE_IN_BYTES_POSTGRESQL);
+    tableSizeQueryMap.put(DBAccessor.DbType.MYSQL, GET_TABLE_SIZE_IN_BYTES_MYSQL);
+    tableSizeQueryMap.put(DBAccessor.DbType.ORACLE, GET_TABLE_SIZE_IN_BYTES_ORACLE);
+
+    List<String> tablesToCheck = Arrays.asList("host_role_command", "execution_command", "stage", "request", "alert_history");
+
+    final double TABLE_SIZE_LIMIT_MB = 3000.0;
+    final int TABLE_ROW_COUNT_LIMIT = 3000000;
+
+    String findTableSizeQuery = tableSizeQueryMap.get(dbType);
+
+    if (dbType == DBAccessor.DbType.ORACLE) {
+      for (int i = 0;i < tablesToCheck.size(); i++) {
+        tablesToCheck.set(i, tablesToCheck.get(i).toUpperCase());
+      }
+    }
+
+    for (String tableName : tablesToCheck) {
+
+      ResultSet rs = null;
+      Statement statement = null;
+      Double tableSizeInMB = null;
+      Long tableSizeInBytes = null;
+      int tableRowCount = -1;
+
+      try {
+        statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
+        rs = statement.executeQuery(String.format(findTableSizeQuery, tableName));
+        if (rs != null) {
+          while (rs.next()) {
+            tableSizeInBytes = rs.getLong(1);
+            if (tableSizeInBytes != null) {
+              tableSizeInMB = tableSizeInBytes / 1024.0 / 1024.0;
+            }
+          }
+        }
+
+        if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
+          warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " +
+                  "that you reduce its size by executing \"ambari-server db-cleanup\".",
+                  tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
+        } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) {
+          LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)",
+                  tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB));
+        } else {
+          throw new Exception();
+        }
+      } catch (Exception e) {
+        LOG.error(String.format("Failed to get %s table size from database, will check row count: ", tableName), e);
+        try {
+          rs = statement.executeQuery(String.format(GET_ROW_COUNT_QUERY, tableName));
+          if (rs != null) {
+            while (rs.next()) {
+              tableRowCount = rs.getInt(1);
+            }
+          }
+
+          if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
+            warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " +
+                    "recommended that you reduce its size by executing \"ambari-server db-cleanup\".",
+                    tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
+          } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) {
+            LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT));
+          } else {
+            throw new SQLException();
+          }
+        } catch (SQLException ex) {
+          LOG.error(String.format("Failed to get %s row count: ", tableName), e);
+        }
+      } finally {
+        if (rs != null) {
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            LOG.error("Exception occurred during result set closing procedure: ", e);
+          }
+        }
+
+        if (statement != null) {
+          try {
+            statement.close();
+          } catch (SQLException e) {
+            LOG.error("Exception occurred during statement closing procedure: ", e);
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
    * This method checks if any config type in clusterconfig table, has more than
    * one versions selected. If config version is selected(in selected column =
    * 1), it means that this version of config is actual. So, if any config type

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
index c132a3d..ae07dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
@@ -639,6 +639,12 @@ public interface DBAccessor {
   DbType getDbType();
 
   /**
+   * Get database schema name
+   * @return @dbSchema
+   */
+  String getDbSchema();
+
+  /**
    * Capture column type
    */
   class DBColumnInfo {

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
index 1dd3b54..c11589d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
@@ -233,6 +233,11 @@ public class DBAccessorImpl implements DBAccessor {
   }
 
   @Override
+  public String getDbSchema() {
+    return dbSchema;
+  }
+
+  @Override
   public boolean tableHasData(String tableName) throws SQLException {
     String query = "SELECT count(*) from " + tableName;
     Statement statement = getConnection().createStatement();

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 7318162..6b34575 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -993,4 +993,20 @@ public class HostRoleCommandDAO {
       return HostRoleCommandEntity_.getPredicateMapping().get(propertyId);
     }
   }
+
+  public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    List<Long> taskIds = new ArrayList<Long>();
+    for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
+      TypedQuery<Long> hostRoleCommandQuery =
+              entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class);
+
+      hostRoleCommandQuery.setParameter("requestId", requestIds.getRequestId());
+      hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId());
+
+      taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
+    }
+
+    return taskIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 2696f66..5d53416 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -19,27 +19,54 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.cleanup.TimeBasedCleanupPolicy;
 import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity;
 import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.state.Clusters;
 import org.eclipse.persistence.config.HintValues;
 import org.eclipse.persistence.config.QueryHints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
 
 @Singleton
-public class RequestDAO {
+public class RequestDAO implements Cleanable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class);
+
+
+  private static final int BATCH_SIZE = 999;
+
   /**
    * SQL template to retrieve all request IDs, sorted by the ID.
    */
@@ -64,6 +91,27 @@ public class RequestDAO {
   @Inject
   DaoUtils daoUtils;
 
+  @Inject
+  private Provider<Clusters> m_clusters;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private StageDAO stageDAO;
+
+  @Inject
+  private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
+
+  @Inject
+  private TopologyHostTaskDAO topologyHostTaskDAO;
+
+  @Inject
+  private TopologyLogicalRequestDAO topologyLogicalRequestDAO;
+
+  @Inject
+  private TopologyRequestDAO topologyRequestDAO;
+
   @RequiresSession
   public RequestEntity findByPK(Long requestId) {
     return entityManagerProvider.get().find(RequestEntity.class, requestId);
@@ -197,4 +245,214 @@ public class RequestDAO {
 
     return daoUtils.selectList(query);
   }
+
+  public static final class StageEntityPK {
+    private Long requestId;
+    private Long stageId;
+
+    public StageEntityPK(Long requestId, Long stageId) {
+      this.requestId = requestId;
+      this.stageId = stageId;
+    }
+
+    public Long getStageId() {
+      return stageId;
+    }
+
+    public void setStageId(Long stageId) {
+      this.stageId = stageId;
+    }
+
+    public Long getRequestId() {
+      return requestId;
+    }
+
+    public void setRequestId(Long requestId) {
+      this.requestId = requestId;
+    }
+  }
+
+  /**
+   * Search for all request ids in Upgrade table
+   * @return the list of request ids
+   */
+  private List<Long> findAllRequestIdsFromUpgrade() {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> upgradeQuery =
+            entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class);
+
+    return daoUtils.selectList(upgradeQuery);
+  }
+
+  /**
+   * Search for all request and stage ids in Request and Stage tables
+   * @return the list of request/stage ids
+   */
+  public List<StageEntityPK> findRequestAndStageIdsInClusterBeforeDate(Long clusterId, long  beforeDateMillis) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<StageEntityPK> requestQuery =
+            entityManager.createNamedQuery("RequestEntity.findRequestStageIdsInClusterBeforeDate", StageEntityPK.class);
+
+    requestQuery.setParameter("clusterId", clusterId);
+    requestQuery.setParameter("beforeDate", beforeDateMillis);
+
+    return daoUtils.selectList(requestQuery);
+  }
+
+  /**
+   * In this method we are removing entities using passed ids,
+   * To prevent issues we are using batch request to remove limited
+   * count of entities.
+   * @param ids              list of ids that we are using to remove rows from table
+   * @param paramName        name of parameter that we are using in sql query (taskIds, stageIds)
+   * @param entityName       name of entity which we will remove
+   * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before),
+   *                         we are using it only for logging
+   * @param entityQuery      name of NamedQuery which we will use to remove needed entities
+   * @param type             type of entity class which we will use for casting query result
+   * @return                 rows count that were removed
+   */
+  @Transactional
+  protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
+                                  String entityQuery, Class<T> type) {
+    LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
+    EntityManager entityManager = entityManagerProvider.get();
+    int affectedRows = 0;
+    // Batch delete
+    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+    if (ids != null && !ids.isEmpty()) {
+      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
+        List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
+        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
+        query.setParameter(paramName, idsSubList);
+        affectedRows += query.executeUpdate();
+      }
+    }
+
+    return affectedRows;
+  }
+
+  /**
+   * In this method we are removing entities using passed few ids,
+   * To prevent issues we are using batch request to remove limited
+   * count of entities.
+   * @param ids              list of ids pairs that we are using to remove rows from table
+   * @param paramNames       list of two names of parameters that we are using in sql query (taskIds, stageIds)
+   * @param entityName       name of entity which we will remove
+   * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before),
+   *                         we are using it only for logging
+   * @param entityQuery      name of NamedQuery which we will use to remove needed entities
+   * @param type             type of entity class which we will use for casting query result
+   * @return                 rows count that were removed
+   */
+  @Transactional
+  protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
+                                  String entityQuery, Class<T> type) {
+    LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
+    EntityManager entityManager = entityManagerProvider.get();
+    int affectedRows = 0;
+    // Batch delete
+    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+    if (ids != null && !ids.isEmpty()) {
+      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
+        List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
+        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
+        for (StageEntityPK requestIds : idsSubList) {
+          query.setParameter(paramNames.get(0), requestIds.getStageId());
+          query.setParameter(paramNames.get(1), requestIds.getRequestId());
+          affectedRows += query.executeUpdate();
+        }
+      }
+    }
+
+    return affectedRows;
+  }
+
+  @Transactional
+  @Override
+  public long cleanup(TimeBasedCleanupPolicy policy) {
+    long affectedRows = 0;
+    Long clusterId = null;
+    try {
+      clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+      // find request and stage ids that were created before date populated by user.
+      List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
+
+      // find request ids from Upgrade table and exclude these ids from
+      // request ids set that we already have. We don't want to make any changes for upgrade
+      Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade());
+      Iterator<StageEntityPK> requestStageIdsIterator =  requestStageIds.iterator();
+      while (requestStageIdsIterator.hasNext()) {
+        StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
+        if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
+          requestStageIdsIterator.remove();
+        }
+      }
+
+
+      Set<Long> requestIds = new HashSet<>();
+      for (StageEntityPK ids : requestStageIds) {
+        requestIds.add(ids.getRequestId());
+      }
+
+      // find task ids using request stage ids
+      Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+      LinkedList<String> params = new LinkedList<>();
+      params.add("stageId");
+      params.add("requestId");
+
+      // find host task ids, to find related host requests and also to remove needed host tasks
+      List<Long> hostTaskIds = new ArrayList<>();
+      if (taskIds != null && !taskIds.isEmpty()) {
+        hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+      }
+
+      // find host request ids by host task ids to remove later needed host requests
+      List<Long> hostRequestIds = new ArrayList<>();
+      if (!hostTaskIds.isEmpty()) {
+        hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
+      }
+
+      List<Long> topologyRequestIds = new ArrayList<>();
+      if (!hostRequestIds.isEmpty()) {
+        topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
+      }
+
+
+      //removing all entities one by one according to their relations using stage, task and request ids
+      affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
+              "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
+      affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
+              "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
+      affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+              "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
+      affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+              "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
+      for (Long topologyRequestId : topologyRequestIds) {
+        topologyRequestDAO.removeByPK(topologyRequestId);
+      }
+      affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
+              "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
+      affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
+              "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
+      affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
+              "StageEntity.removeByRequestStageIds", StageEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
+              "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
+              "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
+      affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
+              "RequestEntity.removeByRequestIds", RequestEntity.class);
+
+    } catch (AmbariException e) {
+      LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
+      throw new IllegalStateException(e);
+    }
+
+    return affectedRows;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 02532db..1b18ffe 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -54,6 +54,17 @@ public class TopologyHostTaskDAO {
   }
 
   @RequiresSession
+  public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyHostTaskQuery =
+            entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
+
+    topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
+
+    return daoUtils.selectList(topologyHostTaskQuery);
+  }
+
+  @RequiresSession
   public List<TopologyHostTaskEntity> findAll() {
     return daoUtils.selectAll(entityManagerProvider.get(), TopologyHostTaskEntity.class);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index e917dc2..ce1131a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao;
 import java.util.List;
 
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
@@ -61,4 +62,15 @@ public class TopologyLogicalRequestDAO {
   public void remove(TopologyLogicalRequestEntity requestEntity) {
     entityManagerProvider.get().remove(requestEntity);
   }
+
+  @RequiresSession
+  public List<Long> findRequestIdsByIds(List<Long> ids) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyLogicalRequestQuery =
+            entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
+
+    topologyLogicalRequestQuery.setParameter("ids", ids);
+
+    return daoUtils.selectList(topologyLogicalRequestQuery);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index 35f47a7..780a3ba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao;
 import java.util.List;
 
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
@@ -43,6 +44,17 @@ public class TopologyLogicalTaskDAO {
   }
 
   @RequiresSession
+  public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) {
+    EntityManager entityManager = entityManagerProvider.get();
+    TypedQuery<Long> topologyHostTaskQuery =
+            entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
+
+    topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
+
+    return daoUtils.selectList(topologyHostTaskQuery);
+  }
+
+  @RequiresSession
   public List<TopologyLogicalTaskEntity> findAll() {
     return daoUtils.selectAll(entityManagerProvider.get(), TopologyLogicalTaskEntity.class);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
index 85f3a25..7015709 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
@@ -26,11 +26,16 @@ import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
 
 @Table(name = "execution_command")
 @Entity
+@NamedQueries({
+    @NamedQuery(name = "ExecutionCommandEntity.removeByTaskIds", query = "DELETE FROM ExecutionCommandEntity command WHERE command.taskId IN :taskIds")
+})
 public class ExecutionCommandEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index fdec5f0..6197940 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang.ArrayUtils;
     , initialValue = 1
 )
 @NamedQueries({
+    @NamedQuery(name = "HostRoleCommandEntity.findTaskIdsByRequestStageIds", query = "SELECT command.taskId FROM HostRoleCommandEntity command WHERE command.stageId = :stageId AND command.requestId = :requestId"),
     @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"),
     @NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", query="SELECT task FROM HostRoleCommandEntity task WHERE task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId ASC"),
     @NamedQuery(name = "HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", query = "SELECT task FROM HostRoleCommandEntity task WHERE task.requestId = :requestId AND task.status IN :statuses ORDER BY task.taskId DESC"),
@@ -71,12 +72,9 @@ import org.apache.commons.lang.ArrayUtils;
     @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"),
     @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand <> :roleCommand"),
     @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand"),
-    @NamedQuery(
-        name = "HostRoleCommandEntity.findHostsByCommandStatus",
-        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
-    @NamedQuery(
-        name = "HostRoleCommandEntity.getBlockingHostsForRequest",
-        query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL")
+    @NamedQuery(name = "HostRoleCommandEntity.removeByTaskIds", query = "DELETE FROM HostRoleCommandEntity command WHERE command.taskId IN :taskIds"),
+    @NamedQuery(name = "HostRoleCommandEntity.findHostsByCommandStatus", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
+    @NamedQuery(name = "HostRoleCommandEntity.getBlockingHostsForRequest", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL")
 
 })
 public class HostRoleCommandEntity {

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index f19aa72..099d08f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -30,6 +30,8 @@ import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.Lob;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
@@ -39,6 +41,10 @@ import org.apache.ambari.server.actionmanager.RequestType;
 
 @Table(name = "request")
 @Entity
+@NamedQueries({
+  @NamedQuery(name = "RequestEntity.findRequestStageIdsInClusterBeforeDate", query = "SELECT NEW org.apache.ambari.server.orm.dao.RequestDAO.StageEntityPK(request.requestId, stage.stageId) FROM RequestEntity request JOIN StageEntity stage ON request.requestId = stage.requestId WHERE request.clusterId = :clusterId AND request.createTime <= :beforeDate"),
+  @NamedQuery(name = "RequestEntity.removeByRequestIds", query = "DELETE FROM RequestEntity request WHERE request.requestId IN :requestIds")
+})
 public class RequestEntity {
 
   @Column(name = "request_id")

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
index ff14e3a..a7cd0d0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
@@ -40,7 +40,9 @@ import javax.persistence.TableGenerator;
 @NamedQueries({
     @NamedQuery(name = "requestOperationLevelByHostId", query =
         "SELECT requestOperationLevel FROM RequestOperationLevelEntity requestOperationLevel " +
-            "WHERE requestOperationLevel.hostId=:hostId")
+            "WHERE requestOperationLevel.hostId=:hostId"),
+    @NamedQuery(name = "RequestOperationLevelEntity.removeByRequestIds",
+        query = "DELETE FROM RequestOperationLevelEntity requestOperationLevel WHERE requestOperationLevel.requestId IN :requestIds")
 })
 public class RequestOperationLevelEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
index 8ee41d2..9597db1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.Lob;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
 
@@ -36,6 +38,9 @@ import javax.persistence.TableGenerator;
   , pkColumnValue = "resourcefilter_id_seq"
   , initialValue = 1
 )
+@NamedQueries({
+  @NamedQuery(name = "RequestResourceFilterEntity.removeByRequestIds", query = "DELETE FROM RequestResourceFilterEntity filter WHERE filter.requestId IN :requestIds")
+})
 public class RequestResourceFilterEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
index 3386c24..66e7fd8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.IdClass;
 import javax.persistence.JoinColumn;
 import javax.persistence.JoinColumns;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 
 import org.apache.ambari.server.Role;
@@ -33,6 +35,9 @@ import org.apache.ambari.server.Role;
 @IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class)
 @Table(name = "role_success_criteria")
 @Entity
+@NamedQueries({
+  @NamedQuery(name = "RoleSuccessCriteriaEntity.removeByRequestStageIds", query = "DELETE FROM RoleSuccessCriteriaEntity criteria WHERE criteria.stageId = :stageId AND criteria.requestId = :requestId")
+})
 public class RoleSuccessCriteriaEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index d035729..f688412 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -50,7 +50,11 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
         query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId = stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId ORDER BY stage.requestId"),
     @NamedQuery(
         name = "StageEntity.findByRequestIdAndCommandStatuses",
-        query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId") })
+        query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.removeByRequestStageIds",
+        query = "DELETE FROM StageEntity stage WHERE stage.stageId = :stageId AND stage.requestId = :requestId")
+})
 public class StageEntity {
 
   @Basic

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
index b90e192..2700f68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
@@ -25,11 +25,16 @@ import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.Table;
 
 @Entity
 @Table(name = "topology_host_request")
+@NamedQueries({
+  @NamedQuery(name = "TopologyHostRequestEntity.removeByIds", query = "DELETE FROM TopologyHostRequestEntity topologyHostRequest WHERE topologyHostRequest.id IN :hostRequestIds")
+})
 public class TopologyHostRequestEntity {
   @Id
 //  @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_host_request_id_generator")

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index bba0e06..0bb3e19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -40,7 +40,11 @@ import javax.persistence.TableGenerator;
   pkColumnValue = "topology_host_task_id_seq", initialValue = 0)
 @NamedQueries({
   @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
-      query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId")
+      query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"),
+  @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
+      query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
+  @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
+      query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds")
 })
 public class TopologyHostTaskEntity {
   @Id
@@ -51,6 +55,9 @@ public class TopologyHostTaskEntity {
   @Column(name = "type", length = 255, nullable = false)
   private String type;
 
+  @Column(name = "host_request_id", nullable = false, insertable = false, updatable = false)
+  private Long hostRequestId;
+
   @ManyToOne
   @JoinColumn(name = "host_request_id", referencedColumnName = "id", nullable = false)
   private TopologyHostRequestEntity topologyHostRequestEntity;
@@ -67,7 +74,11 @@ public class TopologyHostTaskEntity {
   }
 
   public Long getHostRequestId() {
-    return topologyHostRequestEntity != null ? topologyHostRequestEntity.getId() : null;
+    return hostRequestId;
+  }
+
+  public void setHostRequestId(Long hostRequestId) {
+    this.hostRequestId = hostRequestId;
   }
 
   public String getType() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 4f865f4..605a043 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -24,12 +24,17 @@ import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
 
 @Entity
 @Table(name = "topology_logical_request")
+@NamedQueries({
+  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids")
+})
 public class TopologyLogicalRequestEntity {
   @Id
   @Column(name = "id", nullable = false, updatable = false)

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index c71d4e4..2954863 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -24,6 +24,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.OneToOne;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
@@ -33,6 +35,10 @@ import javax.persistence.TableGenerator;
 @TableGenerator(name = "topology_logical_task_id_generator", table = "ambari_sequences",
   pkColumnName = "sequence_name", valueColumnName = "sequence_value",
   pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
+@NamedQueries({
+  @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
+  @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds")
+})
 public class TopologyLogicalTaskEntity {
   @Id
   @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_logical_task_id_generator")
@@ -42,12 +48,18 @@ public class TopologyLogicalTaskEntity {
   @Column(name = "component", length = 255)
   private String componentName;
 
+  @Column(name = "host_task_id", nullable = false, insertable = false, updatable = false)
+  private Long hostTaskId;
+
+  @Column(name = "physical_task_id", nullable = false, insertable = false, updatable = false)
+  private Long physicalTaskId;
+
   @ManyToOne
   @JoinColumn(name = "host_task_id", referencedColumnName = "id", nullable = false)
   private TopologyHostTaskEntity topologyHostTaskEntity;
 
   @OneToOne
-  @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id")
+  @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id", nullable = false)
   private HostRoleCommandEntity hostRoleCommandEntity;
 
   public Long getId() {
@@ -58,14 +70,22 @@ public class TopologyLogicalTaskEntity {
     this.id = id;
   }
 
-  public Long getHostTaskId() {
-    return topologyHostTaskEntity != null ? topologyHostTaskEntity.getId() : null;
-  }
-
   public Long getPhysicalTaskId() {
     return hostRoleCommandEntity != null ? hostRoleCommandEntity.getTaskId() : null;
   }
 
+  public void setPhysicalTaskId(Long physicalTaskId) {
+    this.physicalTaskId = physicalTaskId;
+  }
+
+  public void setHostTaskId(Long hostTaskId) {
+    this.hostTaskId = hostTaskId;
+  }
+
+  public Long getHostTaskId() {
+    return hostTaskId;
+  }
+
   public String getComponentName() {
     return componentName;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
index 89574bc..bea1d19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
@@ -63,6 +63,8 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
       query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC, u.upgradeId DESC"),
   @NamedQuery(name = "UpgradeEntity.findLatestForCluster",
       query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"),
+  @NamedQuery(name = "UpgradeEntity.findAllRequestIds",
+      query = "SELECT upgrade.requestId FROM UpgradeEntity upgrade")
 })
 public class UpgradeEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
index 560970a..35ea769 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
@@ -27,6 +27,8 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.JoinColumn;
 import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
 
@@ -48,6 +50,9 @@ import org.apache.ambari.server.state.UpgradeState;
     pkColumnValue = "upgrade_item_id_seq",
     initialValue = 0,
     allocationSize = 1000)
+@NamedQueries({
+  @NamedQuery(name = "UpgradeItemEntity.findAllStageIds", query = "SELECT upgradeItem.stageId FROM UpgradeItemEntity upgradeItem")
+})
 public class UpgradeItemEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index 87cc6c2..737be6a 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -199,6 +199,12 @@ def restart(args):
   start(args)
 
 
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
+def database_cleanup(args):
+  logger.info("Database cleanup.")
+  if args.silent:
+    stop(args)
+  db_cleanup(args)
 
 #
 # The Ambari Server status.
@@ -469,7 +475,7 @@ def init_parser_options(parser):
                     help="Print verbose status messages")
   parser.add_option("-s", "--silent",
                     action="store_true", dest="silent", default=False,
-                    help="Silently accepts default prompt values")
+                    help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.")
   parser.add_option('-g', '--debug', action="store_true", dest='debug', default=False,
                     help="Start ambari-server in debug mode")
   parser.add_option('-y', '--suspend-start', action="store_true", dest='suspend_start', default=False,
@@ -759,7 +765,7 @@ def create_user_action_map(args, options):
         CHECK_DATABASE_ACTION: UserAction(check_database, options),
         ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
         SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
-        DB_CLEANUP_ACTION: UserAction(db_cleanup, options),
+        DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
         INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
         UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
         UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index abc8267..6e16bc5 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -42,25 +42,29 @@ def run_db_cleanup(options):
     if validate_args(options):
         return 1
 
-    db_title = get_db_type(get_ambari_properties()).title
+    status, stateDesc = is_server_runing()
 
-    confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
-            db_title), True)
-    if not confirmBackup:
-        print_info_msg("Ambari Server Database cleanup aborted")
-        return 0
+    if not options.silent:
+      db_title = get_db_type(get_ambari_properties()).title
+
+      confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
+              db_title), True)
+      if not confirmBackup:
+          print_info_msg("Ambari Server Database cleanup aborted")
+          return 0
+
+      if status:
+          print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
+          return 1
+
+      confirm = get_YN_input(
+          "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
+              db_title, options.cleanup_from_date), True)
+      if not confirm:
+          print_info_msg("Ambari Server Database cleanup aborted")
+          return 0
 
-    status, stateDesc = is_server_runing()
-    if status:
-        print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
-        return 1
 
-    confirm = get_YN_input(
-        "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
-            db_title, options.cleanup_from_date), True)
-    if not confirm:
-        print_info_msg("Ambari Server Database cleanup aborted")
-        return 0
 
     jdk_path = get_java_exe_path()
     if jdk_path is None:
@@ -101,7 +105,6 @@ def run_db_cleanup(options):
 # Database cleanup
 #
 def db_cleanup(options):
-    logger.info("Database cleanup.")
     return run_db_cleanup(options)
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
index 7d8ba50..d6e12dc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
@@ -533,4 +533,70 @@ public class DatabaseConsistencyCheckHelperTest {
   }
 
 
+  @Test
+  public void testCheckForLargeTables() throws Exception {
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariMetaInfo mockAmbariMetainfo = easyMockSupport.createNiceMock(AmbariMetaInfo.class);
+    final DBAccessor mockDBDbAccessor = easyMockSupport.createNiceMock(DBAccessor.class);
+    final Connection mockConnection = easyMockSupport.createNiceMock(Connection.class);
+    final Statement mockStatement = easyMockSupport.createNiceMock(Statement.class);
+    final EntityManager mockEntityManager = easyMockSupport.createNiceMock(EntityManager.class);
+    final Clusters mockClusters = easyMockSupport.createNiceMock(Clusters.class);
+    final OsFamily mockOSFamily = easyMockSupport.createNiceMock(OsFamily.class);
+    final StackManagerFactory mockStackManagerFactory = easyMockSupport.createNiceMock(StackManagerFactory.class);
+
+    final ResultSet hostRoleCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet executionCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet stageResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet requestResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+    final ResultSet alertHistoryResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariMetaInfo.class).toInstance(mockAmbariMetainfo);
+        bind(StackManagerFactory.class).toInstance(mockStackManagerFactory);
+        bind(EntityManager.class).toInstance(mockEntityManager);
+        bind(DBAccessor.class).toInstance(mockDBDbAccessor);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(OsFamily.class).toInstance(mockOSFamily);
+      }
+    });
+
+    expect(hostRoleCommandResultSet.next()).andReturn(true).once();
+    expect(executionCommandResultSet.next()).andReturn(true).once();
+    expect(stageResultSet.next()).andReturn(true).once();
+    expect(requestResultSet.next()).andReturn(true).once();
+    expect(alertHistoryResultSet.next()).andReturn(true).once();
+    expect(hostRoleCommandResultSet.getLong(1)).andReturn(2345L).atLeastOnce();
+    expect(executionCommandResultSet.getLong(1)).andReturn(12345L).atLeastOnce();
+    expect(stageResultSet.getLong(1)).andReturn(2321L).atLeastOnce();
+    expect(requestResultSet.getLong(1)).andReturn(1111L).atLeastOnce();
+    expect(alertHistoryResultSet.getLong(1)).andReturn(2223L).atLeastOnce();
+    expect(mockDBDbAccessor.getConnection()).andReturn(mockConnection);
+    expect(mockDBDbAccessor.getDbType()).andReturn(DBAccessor.DbType.MYSQL);
+    expect(mockDBDbAccessor.getDbSchema()).andReturn("test_schema");
+    expect(mockConnection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE)).andReturn(mockStatement).anyTimes();
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"host_role_command\"")).andReturn(hostRoleCommandResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"execution_command\"")).andReturn(executionCommandResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"stage\"")).andReturn(stageResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"request\"")).andReturn(requestResultSet);
+    expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+            "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"alert_history\"")).andReturn(alertHistoryResultSet);
+
+    DatabaseConsistencyCheckHelper.setInjector(mockInjector);
+
+    easyMockSupport.replayAll();
+
+    mockAmbariMetainfo.init();
+
+    DatabaseConsistencyCheckHelper.resetCheckResult();
+    DatabaseConsistencyCheckHelper.checkForLargeTables();
+
+    easyMockSupport.verifyAll();
+  }
 }