You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2017/04/12 13:31:11 UTC
ambari git commit: AMBARI-20687. Perf: Refactor ambari db-cleanup to
include all big tables.(vbrodetskyi)
Repository: ambari
Updated Branches:
refs/heads/trunk 273653b5a -> 11ab63f7f
AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11ab63f7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11ab63f7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11ab63f7
Branch: refs/heads/trunk
Commit: 11ab63f7ffa99a7d381b86282846fa7c80ddc88e
Parents: 273653b
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Wed Apr 12 16:30:34 2017 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Wed Apr 12 16:30:34 2017 +0300
----------------------------------------------------------------------
.../checks/DatabaseConsistencyCheckHelper.java | 117 +++++++++
.../apache/ambari/server/orm/DBAccessor.java | 6 +
.../ambari/server/orm/DBAccessorImpl.java | 5 +
.../server/orm/dao/HostRoleCommandDAO.java | 16 ++
.../ambari/server/orm/dao/RequestDAO.java | 260 ++++++++++++++++++-
.../server/orm/dao/TopologyHostTaskDAO.java | 11 +
.../orm/dao/TopologyLogicalRequestDAO.java | 12 +
.../server/orm/dao/TopologyLogicalTaskDAO.java | 12 +
.../orm/entities/ExecutionCommandEntity.java | 5 +
.../orm/entities/HostRoleCommandEntity.java | 10 +-
.../server/orm/entities/RequestEntity.java | 6 +
.../entities/RequestOperationLevelEntity.java | 4 +-
.../entities/RequestResourceFilterEntity.java | 5 +
.../orm/entities/RoleSuccessCriteriaEntity.java | 5 +
.../ambari/server/orm/entities/StageEntity.java | 6 +-
.../orm/entities/TopologyHostRequestEntity.java | 5 +
.../orm/entities/TopologyHostTaskEntity.java | 15 +-
.../entities/TopologyLogicalRequestEntity.java | 5 +
.../orm/entities/TopologyLogicalTaskEntity.java | 30 ++-
.../server/orm/entities/UpgradeEntity.java | 2 +
.../server/orm/entities/UpgradeItemEntity.java | 5 +
ambari-server/src/main/python/ambari-server.py | 10 +-
.../src/main/python/ambari_server/dbCleanup.py | 37 +--
.../DatabaseConsistencyCheckHelperTest.java | 66 +++++
24 files changed, 620 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index e7e9433..b2a03e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -46,9 +47,12 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.orm.DBAccessor;
import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.MetainfoDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
@@ -81,6 +85,9 @@ public class DatabaseConsistencyCheckHelper {
private static Connection connection;
private static AmbariMetaInfo ambariMetaInfo;
private static DBAccessor dbAccessor;
+ private static HostRoleCommandDAO hostRoleCommandDAO;
+ private static ExecutionCommandDAO executionCommandDAO;
+ private static StageDAO stageDAO;
private static DatabaseConsistencyCheckResult checkResult = DatabaseConsistencyCheckResult.DB_CHECK_SUCCESS;
@@ -174,6 +181,7 @@ public class DatabaseConsistencyCheckHelper {
checkHostComponentStates();
checkServiceConfigs();
checkTopologyTables();
+ checkForLargeTables();
LOG.info("******************************* Check database completed *******************************");
return checkResult;
}
@@ -223,6 +231,115 @@ public class DatabaseConsistencyCheckHelper {
}
/**
+ * This method checks if ambari database has tables with too big size (according to limit).
+ * First of all we are trying to get table size from schema information, but if it's not possible,
+ * we will get tables rows count and compare it with row count limit.
+ */
+ static void checkForLargeTables() {
+ LOG.info("Checking for tables with large physical size");
+
+ ensureConnection();
+
+ DBAccessor.DbType dbType = dbAccessor.getDbType();
+ String schemaName = dbAccessor.getDbSchema();
+
+ String GET_TABLE_SIZE_IN_BYTES_POSTGRESQL = "SELECT pg_total_relation_size('%s') \"Table Size\"";
+ String GET_TABLE_SIZE_IN_BYTES_MYSQL = "SELECT (data_length + index_length) \"Table Size\" FROM information_schema.TABLES WHERE table_schema = \"" + schemaName + "\" AND table_name =\"%s\"";
+ String GET_TABLE_SIZE_IN_BYTES_ORACLE = "SELECT bytes \"Table Size\" FROM user_segments WHERE segment_type='TABLE' AND segment_name='%s'";
+ String GET_ROW_COUNT_QUERY = "SELECT COUNT(*) FROM %s";
+
+ Map<DBAccessor.DbType, String> tableSizeQueryMap = new HashMap<>();
+ tableSizeQueryMap.put(DBAccessor.DbType.POSTGRES, GET_TABLE_SIZE_IN_BYTES_POSTGRESQL);
+ tableSizeQueryMap.put(DBAccessor.DbType.MYSQL, GET_TABLE_SIZE_IN_BYTES_MYSQL);
+ tableSizeQueryMap.put(DBAccessor.DbType.ORACLE, GET_TABLE_SIZE_IN_BYTES_ORACLE);
+
+ List<String> tablesToCheck = Arrays.asList("host_role_command", "execution_command", "stage", "request", "alert_history");
+
+ final double TABLE_SIZE_LIMIT_MB = 3000.0;
+ final int TABLE_ROW_COUNT_LIMIT = 3000000;
+
+ String findTableSizeQuery = tableSizeQueryMap.get(dbType);
+
+ if (dbType == DBAccessor.DbType.ORACLE) {
+ for (int i = 0;i < tablesToCheck.size(); i++) {
+ tablesToCheck.set(i, tablesToCheck.get(i).toUpperCase());
+ }
+ }
+
+ for (String tableName : tablesToCheck) {
+
+ ResultSet rs = null;
+ Statement statement = null;
+ Double tableSizeInMB = null;
+ Long tableSizeInBytes = null;
+ int tableRowCount = -1;
+
+ try {
+ statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
+ rs = statement.executeQuery(String.format(findTableSizeQuery, tableName));
+ if (rs != null) {
+ while (rs.next()) {
+ tableSizeInBytes = rs.getLong(1);
+ if (tableSizeInBytes != null) {
+ tableSizeInMB = tableSizeInBytes / 1024.0 / 1024.0;
+ }
+ }
+ }
+
+ if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
+ warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " +
+ "that you reduce its size by executing \"ambari-server db-cleanup\".",
+ tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
+ } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) {
+ LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)",
+ tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB));
+ } else {
+ throw new Exception();
+ }
+ } catch (Exception e) {
+ LOG.error(String.format("Failed to get %s table size from database, will check row count: ", tableName), e);
+ try {
+ rs = statement.executeQuery(String.format(GET_ROW_COUNT_QUERY, tableName));
+ if (rs != null) {
+ while (rs.next()) {
+ tableRowCount = rs.getInt(1);
+ }
+ }
+
+ if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
+ warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " +
+ "recommended that you reduce its size by executing \"ambari-server db-cleanup\".",
+ tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
+ } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) {
+ LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT));
+ } else {
+ throw new SQLException();
+ }
+ } catch (SQLException ex) {
+ LOG.error(String.format("Failed to get %s row count: ", tableName), e);
+ }
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ LOG.error("Exception occurred during result set closing procedure: ", e);
+ }
+ }
+
+ if (statement != null) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ LOG.error("Exception occurred during statement closing procedure: ", e);
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
* This method checks if any config type in clusterconfig table, has more than
* one versions selected. If config version is selected(in selected column =
* 1), it means that this version of config is actual. So, if any config type
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
index c132a3d..ae07dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
@@ -639,6 +639,12 @@ public interface DBAccessor {
DbType getDbType();
/**
+ * Get database schema name
+ * @return @dbSchema
+ */
+ String getDbSchema();
+
+ /**
* Capture column type
*/
class DBColumnInfo {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
index 1dd3b54..c11589d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
@@ -233,6 +233,11 @@ public class DBAccessorImpl implements DBAccessor {
}
@Override
+ public String getDbSchema() {
+ return dbSchema;
+ }
+
+ @Override
public boolean tableHasData(String tableName) throws SQLException {
String query = "SELECT count(*) from " + tableName;
Statement statement = getConnection().createStatement();
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 7318162..6b34575 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -993,4 +993,20 @@ public class HostRoleCommandDAO {
return HostRoleCommandEntity_.getPredicateMapping().get(propertyId);
}
}
+
+ public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+ EntityManager entityManager = entityManagerProvider.get();
+ List<Long> taskIds = new ArrayList<Long>();
+ for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
+ TypedQuery<Long> hostRoleCommandQuery =
+ entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class);
+
+ hostRoleCommandQuery.setParameter("requestId", requestIds.getRequestId());
+ hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId());
+
+ taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
+ }
+
+ return taskIds;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 2696f66..5d53416 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -19,27 +19,54 @@
package org.apache.ambari.server.orm.dao;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
+import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.cleanup.TimeBasedCleanupPolicy;
import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity;
import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.state.Clusters;
import org.eclipse.persistence.config.HintValues;
import org.eclipse.persistence.config.QueryHints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.persist.Transactional;
@Singleton
-public class RequestDAO {
+public class RequestDAO implements Cleanable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class);
+
+
+ private static final int BATCH_SIZE = 999;
+
/**
* SQL template to retrieve all request IDs, sorted by the ID.
*/
@@ -64,6 +91,27 @@ public class RequestDAO {
@Inject
DaoUtils daoUtils;
+ @Inject
+ private Provider<Clusters> m_clusters;
+
+ @Inject
+ private HostRoleCommandDAO hostRoleCommandDAO;
+
+ @Inject
+ private StageDAO stageDAO;
+
+ @Inject
+ private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
+
+ @Inject
+ private TopologyHostTaskDAO topologyHostTaskDAO;
+
+ @Inject
+ private TopologyLogicalRequestDAO topologyLogicalRequestDAO;
+
+ @Inject
+ private TopologyRequestDAO topologyRequestDAO;
+
@RequiresSession
public RequestEntity findByPK(Long requestId) {
return entityManagerProvider.get().find(RequestEntity.class, requestId);
@@ -197,4 +245,214 @@ public class RequestDAO {
return daoUtils.selectList(query);
}
+
+ public static final class StageEntityPK {
+ private Long requestId;
+ private Long stageId;
+
+ public StageEntityPK(Long requestId, Long stageId) {
+ this.requestId = requestId;
+ this.stageId = stageId;
+ }
+
+ public Long getStageId() {
+ return stageId;
+ }
+
+ public void setStageId(Long stageId) {
+ this.stageId = stageId;
+ }
+
+ public Long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(Long requestId) {
+ this.requestId = requestId;
+ }
+ }
+
+ /**
+ * Search for all request ids in Upgrade table
+ * @return the list of request ids
+ */
+ private List<Long> findAllRequestIdsFromUpgrade() {
+ EntityManager entityManager = entityManagerProvider.get();
+ TypedQuery<Long> upgradeQuery =
+ entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class);
+
+ return daoUtils.selectList(upgradeQuery);
+ }
+
+ /**
+ * Search for all request and stage ids in Request and Stage tables
+ * @return the list of request/stage ids
+ */
+ public List<StageEntityPK> findRequestAndStageIdsInClusterBeforeDate(Long clusterId, long beforeDateMillis) {
+ EntityManager entityManager = entityManagerProvider.get();
+ TypedQuery<StageEntityPK> requestQuery =
+ entityManager.createNamedQuery("RequestEntity.findRequestStageIdsInClusterBeforeDate", StageEntityPK.class);
+
+ requestQuery.setParameter("clusterId", clusterId);
+ requestQuery.setParameter("beforeDate", beforeDateMillis);
+
+ return daoUtils.selectList(requestQuery);
+ }
+
+ /**
+ * In this method we are removing entities using passed ids,
+ * To prevent issues we are using batch request to remove limited
+ * count of entities.
+ * @param ids list of ids that we are using to remove rows from table
+ * @param paramName name of parameter that we are using in sql query (taskIds, stageIds)
+ * @param entityName name of entity which we will remove
+ * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before),
+ * we are using it only for logging
+ * @param entityQuery name of NamedQuery which we will use to remove needed entities
+ * @param type type of entity class which we will use for casting query result
+ * @return rows count that were removed
+ */
+ @Transactional
+ protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
+ String entityQuery, Class<T> type) {
+ LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
+ EntityManager entityManager = entityManagerProvider.get();
+ int affectedRows = 0;
+ // Batch delete
+ TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+ if (ids != null && !ids.isEmpty()) {
+ for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+ int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
+ List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
+ LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+ idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
+ query.setParameter(paramName, idsSubList);
+ affectedRows += query.executeUpdate();
+ }
+ }
+
+ return affectedRows;
+ }
+
+ /**
+ * In this method we are removing entities using passed few ids,
+ * To prevent issues we are using batch request to remove limited
+ * count of entities.
+ * @param ids list of ids pairs that we are using to remove rows from table
+ * @param paramNames list of two names of parameters that we are using in sql query (taskIds, stageIds)
+ * @param entityName name of entity which we will remove
+ * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before),
+ * we are using it only for logging
+ * @param entityQuery name of NamedQuery which we will use to remove needed entities
+ * @param type type of entity class which we will use for casting query result
+ * @return rows count that were removed
+ */
+ @Transactional
+ protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
+ String entityQuery, Class<T> type) {
+ LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
+ EntityManager entityManager = entityManagerProvider.get();
+ int affectedRows = 0;
+ // Batch delete
+ TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+ if (ids != null && !ids.isEmpty()) {
+ for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
+ int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
+ List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
+ LOG.info("Deleting " + entityName + " entity batch with task ids: " +
+ idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
+ for (StageEntityPK requestIds : idsSubList) {
+ query.setParameter(paramNames.get(0), requestIds.getStageId());
+ query.setParameter(paramNames.get(1), requestIds.getRequestId());
+ affectedRows += query.executeUpdate();
+ }
+ }
+ }
+
+ return affectedRows;
+ }
+
+ @Transactional
+ @Override
+ public long cleanup(TimeBasedCleanupPolicy policy) {
+ long affectedRows = 0;
+ Long clusterId = null;
+ try {
+ clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+ // find request and stage ids that were created before date populated by user.
+ List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
+
+ // find request ids from Upgrade table and exclude these ids from
+ // request ids set that we already have. We don't want to make any changes for upgrade
+ Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade());
+ Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator();
+ while (requestStageIdsIterator.hasNext()) {
+ StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
+ if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
+ requestStageIdsIterator.remove();
+ }
+ }
+
+
+ Set<Long> requestIds = new HashSet<>();
+ for (StageEntityPK ids : requestStageIds) {
+ requestIds.add(ids.getRequestId());
+ }
+
+ // find task ids using request stage ids
+ Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+ LinkedList<String> params = new LinkedList<>();
+ params.add("stageId");
+ params.add("requestId");
+
+ // find host task ids, to find related host requests and also to remove needed host tasks
+ List<Long> hostTaskIds = new ArrayList<>();
+ if (taskIds != null && !taskIds.isEmpty()) {
+ hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+ }
+
+ // find host request ids by host task ids to remove later needed host requests
+ List<Long> hostRequestIds = new ArrayList<>();
+ if (!hostTaskIds.isEmpty()) {
+ hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
+ }
+
+ List<Long> topologyRequestIds = new ArrayList<>();
+ if (!hostRequestIds.isEmpty()) {
+ topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
+ }
+
+
+ //removing all entities one by one according to their relations using stage, task and request ids
+ affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
+ "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
+ affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
+ "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
+ affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+ "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
+ affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+ "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
+ for (Long topologyRequestId : topologyRequestIds) {
+ topologyRequestDAO.removeByPK(topologyRequestId);
+ }
+ affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
+ "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
+ affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
+ "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
+ affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
+ "StageEntity.removeByRequestStageIds", StageEntity.class);
+ affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
+ "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
+ affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
+ "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
+ affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
+ "RequestEntity.removeByRequestIds", RequestEntity.class);
+
+ } catch (AmbariException e) {
+ LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
+ throw new IllegalStateException(e);
+ }
+
+ return affectedRows;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 02532db..1b18ffe 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -54,6 +54,17 @@ public class TopologyHostTaskDAO {
}
@RequiresSession
+ public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+ EntityManager entityManager = entityManagerProvider.get();
+ TypedQuery<Long> topologyHostTaskQuery =
+ entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
+
+ topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
+
+ return daoUtils.selectList(topologyHostTaskQuery);
+ }
+
+ @RequiresSession
public List<TopologyHostTaskEntity> findAll() {
return daoUtils.selectAll(entityManagerProvider.get(), TopologyHostTaskEntity.class);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index e917dc2..ce1131a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao;
import java.util.List;
import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
@@ -61,4 +62,15 @@ public class TopologyLogicalRequestDAO {
public void remove(TopologyLogicalRequestEntity requestEntity) {
entityManagerProvider.get().remove(requestEntity);
}
+
+ @RequiresSession
+ public List<Long> findRequestIdsByIds(List<Long> ids) {
+ EntityManager entityManager = entityManagerProvider.get();
+ TypedQuery<Long> topologyLogicalRequestQuery =
+ entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
+
+ topologyLogicalRequestQuery.setParameter("ids", ids);
+
+ return daoUtils.selectList(topologyLogicalRequestQuery);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index 35f47a7..780a3ba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.orm.dao;
import java.util.List;
import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
@@ -43,6 +44,17 @@ public class TopologyLogicalTaskDAO {
}
@RequiresSession
+ public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) {
+ EntityManager entityManager = entityManagerProvider.get();
+ TypedQuery<Long> topologyHostTaskQuery =
+ entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
+
+ topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
+
+ return daoUtils.selectList(topologyHostTaskQuery);
+ }
+
+ @RequiresSession
public List<TopologyLogicalTaskEntity> findAll() {
return daoUtils.selectAll(entityManagerProvider.get(), TopologyLogicalTaskEntity.class);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
index 85f3a25..7015709 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
@@ -26,11 +26,16 @@ import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToOne;
import javax.persistence.Table;
@Table(name = "execution_command")
@Entity
+@NamedQueries({
+ @NamedQuery(name = "ExecutionCommandEntity.removeByTaskIds", query = "DELETE FROM ExecutionCommandEntity command WHERE command.taskId IN :taskIds")
+})
public class ExecutionCommandEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index fdec5f0..6197940 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang.ArrayUtils;
, initialValue = 1
)
@NamedQueries({
+ @NamedQuery(name = "HostRoleCommandEntity.findTaskIdsByRequestStageIds", query = "SELECT command.taskId FROM HostRoleCommandEntity command WHERE command.stageId = :stageId AND command.requestId = :requestId"),
@NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"),
@NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", query="SELECT task FROM HostRoleCommandEntity task WHERE task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId ASC"),
@NamedQuery(name = "HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", query = "SELECT task FROM HostRoleCommandEntity task WHERE task.requestId = :requestId AND task.status IN :statuses ORDER BY task.taskId DESC"),
@@ -71,12 +72,9 @@ import org.apache.commons.lang.ArrayUtils;
@NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"),
@NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand <> :roleCommand"),
@NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand"),
- @NamedQuery(
- name = "HostRoleCommandEntity.findHostsByCommandStatus",
- query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
- @NamedQuery(
- name = "HostRoleCommandEntity.getBlockingHostsForRequest",
- query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL")
+ @NamedQuery(name = "HostRoleCommandEntity.removeByTaskIds", query = "DELETE FROM HostRoleCommandEntity command WHERE command.taskId IN :taskIds"),
+ @NamedQuery(name = "HostRoleCommandEntity.findHostsByCommandStatus", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
+ @NamedQuery(name = "HostRoleCommandEntity.getBlockingHostsForRequest", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL")
})
public class HostRoleCommandEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
index f19aa72..099d08f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -30,6 +30,8 @@ import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
@@ -39,6 +41,10 @@ import org.apache.ambari.server.actionmanager.RequestType;
@Table(name = "request")
@Entity
+@NamedQueries({
+ @NamedQuery(name = "RequestEntity.findRequestStageIdsInClusterBeforeDate", query = "SELECT NEW org.apache.ambari.server.orm.dao.RequestDAO.StageEntityPK(request.requestId, stage.stageId) FROM RequestEntity request JOIN StageEntity stage ON request.requestId = stage.requestId WHERE request.clusterId = :clusterId AND request.createTime <= :beforeDate"),
+ @NamedQuery(name = "RequestEntity.removeByRequestIds", query = "DELETE FROM RequestEntity request WHERE request.requestId IN :requestIds")
+})
public class RequestEntity {
@Column(name = "request_id")
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
index ff14e3a..a7cd0d0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java
@@ -40,7 +40,9 @@ import javax.persistence.TableGenerator;
@NamedQueries({
@NamedQuery(name = "requestOperationLevelByHostId", query =
"SELECT requestOperationLevel FROM RequestOperationLevelEntity requestOperationLevel " +
- "WHERE requestOperationLevel.hostId=:hostId")
+ "WHERE requestOperationLevel.hostId=:hostId"),
+ @NamedQuery(name = "RequestOperationLevelEntity.removeByRequestIds",
+ query = "DELETE FROM RequestOperationLevelEntity requestOperationLevel WHERE requestOperationLevel.requestId IN :requestIds")
})
public class RequestOperationLevelEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
index 8ee41d2..9597db1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
@@ -36,6 +38,9 @@ import javax.persistence.TableGenerator;
, pkColumnValue = "resourcefilter_id_seq"
, initialValue = 1
)
+@NamedQueries({
+ @NamedQuery(name = "RequestResourceFilterEntity.removeByRequestIds", query = "DELETE FROM RequestResourceFilterEntity filter WHERE filter.requestId IN :requestIds")
+})
public class RequestResourceFilterEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
index 3386c24..66e7fd8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
@@ -26,6 +26,8 @@ import javax.persistence.IdClass;
import javax.persistence.JoinColumn;
import javax.persistence.JoinColumns;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.Table;
import org.apache.ambari.server.Role;
@@ -33,6 +35,9 @@ import org.apache.ambari.server.Role;
@IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class)
@Table(name = "role_success_criteria")
@Entity
+@NamedQueries({
+ @NamedQuery(name = "RoleSuccessCriteriaEntity.removeByRequestStageIds", query = "DELETE FROM RoleSuccessCriteriaEntity criteria WHERE criteria.stageId = :stageId AND criteria.requestId = :requestId")
+})
public class RoleSuccessCriteriaEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index d035729..f688412 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -50,7 +50,11 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId = stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId ORDER BY stage.requestId"),
@NamedQuery(
name = "StageEntity.findByRequestIdAndCommandStatuses",
- query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId") })
+ query = "SELECT stage from StageEntity stage WHERE stage.status IN :statuses AND stage.requestId = :requestId ORDER BY stage.stageId"),
+ @NamedQuery(
+ name = "StageEntity.removeByRequestStageIds",
+ query = "DELETE FROM StageEntity stage WHERE stage.stageId = :stageId AND stage.requestId = :requestId")
+})
public class StageEntity {
@Basic
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
index b90e192..2700f68 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
@@ -25,11 +25,16 @@ import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.Table;
@Entity
@Table(name = "topology_host_request")
+@NamedQueries({
+ @NamedQuery(name = "TopologyHostRequestEntity.removeByIds", query = "DELETE FROM TopologyHostRequestEntity topologyHostRequest WHERE topologyHostRequest.id IN :hostRequestIds")
+})
public class TopologyHostRequestEntity {
@Id
// @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_host_request_id_generator")
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index bba0e06..0bb3e19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -40,7 +40,11 @@ import javax.persistence.TableGenerator;
pkColumnValue = "topology_host_task_id_seq", initialValue = 0)
@NamedQueries({
@NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
- query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId")
+ query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"),
+ @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
+ query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
+ @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
+ query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds")
})
public class TopologyHostTaskEntity {
@Id
@@ -51,6 +55,9 @@ public class TopologyHostTaskEntity {
@Column(name = "type", length = 255, nullable = false)
private String type;
+ @Column(name = "host_request_id", nullable = false, insertable = false, updatable = false)
+ private Long hostRequestId;
+
@ManyToOne
@JoinColumn(name = "host_request_id", referencedColumnName = "id", nullable = false)
private TopologyHostRequestEntity topologyHostRequestEntity;
@@ -67,7 +74,11 @@ public class TopologyHostTaskEntity {
}
public Long getHostRequestId() {
- return topologyHostRequestEntity != null ? topologyHostRequestEntity.getId() : null;
+ return hostRequestId;
+ }
+
+ public void setHostRequestId(Long hostRequestId) {
+ this.hostRequestId = hostRequestId;
}
public String getType() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 4f865f4..605a043 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -24,12 +24,17 @@ import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
@Entity
@Table(name = "topology_logical_request")
+@NamedQueries({
+ @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids")
+})
public class TopologyLogicalRequestEntity {
@Id
@Column(name = "id", nullable = false, updatable = false)
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index c71d4e4..2954863 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -24,6 +24,8 @@ import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
@@ -33,6 +35,10 @@ import javax.persistence.TableGenerator;
@TableGenerator(name = "topology_logical_task_id_generator", table = "ambari_sequences",
pkColumnName = "sequence_name", valueColumnName = "sequence_value",
pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
+@NamedQueries({
+ @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
+ @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds")
+})
public class TopologyLogicalTaskEntity {
@Id
@GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_logical_task_id_generator")
@@ -42,12 +48,18 @@ public class TopologyLogicalTaskEntity {
@Column(name = "component", length = 255)
private String componentName;
+ @Column(name = "host_task_id", nullable = false, insertable = false, updatable = false)
+ private Long hostTaskId;
+
+ @Column(name = "physical_task_id", nullable = false, insertable = false, updatable = false)
+ private Long physicalTaskId;
+
@ManyToOne
@JoinColumn(name = "host_task_id", referencedColumnName = "id", nullable = false)
private TopologyHostTaskEntity topologyHostTaskEntity;
@OneToOne
- @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id")
+ @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id", nullable = false)
private HostRoleCommandEntity hostRoleCommandEntity;
public Long getId() {
@@ -58,14 +70,22 @@ public class TopologyLogicalTaskEntity {
this.id = id;
}
- public Long getHostTaskId() {
- return topologyHostTaskEntity != null ? topologyHostTaskEntity.getId() : null;
- }
-
public Long getPhysicalTaskId() {
return hostRoleCommandEntity != null ? hostRoleCommandEntity.getTaskId() : null;
}
+ public void setPhysicalTaskId(Long physicalTaskId) {
+ this.physicalTaskId = physicalTaskId;
+ }
+
+ public void setHostTaskId(Long hostTaskId) {
+ this.hostTaskId = hostTaskId;
+ }
+
+ public Long getHostTaskId() {
+ return hostTaskId;
+ }
+
public String getComponentName() {
return componentName;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
index 89574bc..bea1d19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
@@ -63,6 +63,8 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC, u.upgradeId DESC"),
@NamedQuery(name = "UpgradeEntity.findLatestForCluster",
query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"),
+ @NamedQuery(name = "UpgradeEntity.findAllRequestIds",
+ query = "SELECT upgrade.requestId FROM UpgradeEntity upgrade")
})
public class UpgradeEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
index 560970a..35ea769 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
@@ -27,6 +27,8 @@ import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
@@ -48,6 +50,9 @@ import org.apache.ambari.server.state.UpgradeState;
pkColumnValue = "upgrade_item_id_seq",
initialValue = 0,
allocationSize = 1000)
+@NamedQueries({
+ @NamedQuery(name = "UpgradeItemEntity.findAllStageIds", query = "SELECT upgradeItem.stageId FROM UpgradeItemEntity upgradeItem")
+})
public class UpgradeItemEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index 87cc6c2..737be6a 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -199,6 +199,12 @@ def restart(args):
start(args)
+@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
+def database_cleanup(args):
+ logger.info("Database cleanup.")
+ if args.silent:
+ stop(args)
+ db_cleanup(args)
#
# The Ambari Server status.
@@ -469,7 +475,7 @@ def init_parser_options(parser):
help="Print verbose status messages")
parser.add_option("-s", "--silent",
action="store_true", dest="silent", default=False,
- help="Silently accepts default prompt values")
+ help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.")
parser.add_option('-g', '--debug', action="store_true", dest='debug', default=False,
help="Start ambari-server in debug mode")
parser.add_option('-y', '--suspend-start', action="store_true", dest='suspend_start', default=False,
@@ -759,7 +765,7 @@ def create_user_action_map(args, options):
CHECK_DATABASE_ACTION: UserAction(check_database, options),
ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
- DB_CLEANUP_ACTION: UserAction(db_cleanup, options),
+ DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index abc8267..6e16bc5 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -42,25 +42,29 @@ def run_db_cleanup(options):
if validate_args(options):
return 1
- db_title = get_db_type(get_ambari_properties()).title
+ status, stateDesc = is_server_runing()
- confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
- db_title), True)
- if not confirmBackup:
- print_info_msg("Ambari Server Database cleanup aborted")
- return 0
+ if not options.silent:
+ db_title = get_db_type(get_ambari_properties()).title
+
+ confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
+ db_title), True)
+ if not confirmBackup:
+ print_info_msg("Ambari Server Database cleanup aborted")
+ return 0
+
+ if status:
+ print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
+ return 1
+
+ confirm = get_YN_input(
+ "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
+ db_title, options.cleanup_from_date), True)
+ if not confirm:
+ print_info_msg("Ambari Server Database cleanup aborted")
+ return 0
- status, stateDesc = is_server_runing()
- if status:
- print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
- return 1
- confirm = get_YN_input(
- "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
- db_title, options.cleanup_from_date), True)
- if not confirm:
- print_info_msg("Ambari Server Database cleanup aborted")
- return 0
jdk_path = get_java_exe_path()
if jdk_path is None:
@@ -101,7 +105,6 @@ def run_db_cleanup(options):
# Database cleanup
#
def db_cleanup(options):
- logger.info("Database cleanup.")
return run_db_cleanup(options)
http://git-wip-us.apache.org/repos/asf/ambari/blob/11ab63f7/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
index 7d8ba50..d6e12dc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
@@ -533,4 +533,70 @@ public class DatabaseConsistencyCheckHelperTest {
}
+ @Test
+ public void testCheckForLargeTables() throws Exception {
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+ final AmbariMetaInfo mockAmbariMetainfo = easyMockSupport.createNiceMock(AmbariMetaInfo.class);
+ final DBAccessor mockDBDbAccessor = easyMockSupport.createNiceMock(DBAccessor.class);
+ final Connection mockConnection = easyMockSupport.createNiceMock(Connection.class);
+ final Statement mockStatement = easyMockSupport.createNiceMock(Statement.class);
+ final EntityManager mockEntityManager = easyMockSupport.createNiceMock(EntityManager.class);
+ final Clusters mockClusters = easyMockSupport.createNiceMock(Clusters.class);
+ final OsFamily mockOSFamily = easyMockSupport.createNiceMock(OsFamily.class);
+ final StackManagerFactory mockStackManagerFactory = easyMockSupport.createNiceMock(StackManagerFactory.class);
+
+ final ResultSet hostRoleCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+ final ResultSet executionCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+ final ResultSet stageResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+ final ResultSet requestResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+ final ResultSet alertHistoryResultSet = easyMockSupport.createNiceMock(ResultSet.class);
+
+ final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(AmbariMetaInfo.class).toInstance(mockAmbariMetainfo);
+ bind(StackManagerFactory.class).toInstance(mockStackManagerFactory);
+ bind(EntityManager.class).toInstance(mockEntityManager);
+ bind(DBAccessor.class).toInstance(mockDBDbAccessor);
+ bind(Clusters.class).toInstance(mockClusters);
+ bind(OsFamily.class).toInstance(mockOSFamily);
+ }
+ });
+
+ expect(hostRoleCommandResultSet.next()).andReturn(true).once();
+ expect(executionCommandResultSet.next()).andReturn(true).once();
+ expect(stageResultSet.next()).andReturn(true).once();
+ expect(requestResultSet.next()).andReturn(true).once();
+ expect(alertHistoryResultSet.next()).andReturn(true).once();
+ expect(hostRoleCommandResultSet.getLong(1)).andReturn(2345L).atLeastOnce();
+ expect(executionCommandResultSet.getLong(1)).andReturn(12345L).atLeastOnce();
+ expect(stageResultSet.getLong(1)).andReturn(2321L).atLeastOnce();
+ expect(requestResultSet.getLong(1)).andReturn(1111L).atLeastOnce();
+ expect(alertHistoryResultSet.getLong(1)).andReturn(2223L).atLeastOnce();
+ expect(mockDBDbAccessor.getConnection()).andReturn(mockConnection);
+ expect(mockDBDbAccessor.getDbType()).andReturn(DBAccessor.DbType.MYSQL);
+ expect(mockDBDbAccessor.getDbSchema()).andReturn("test_schema");
+ expect(mockConnection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE)).andReturn(mockStatement).anyTimes();
+ expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+ "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"host_role_command\"")).andReturn(hostRoleCommandResultSet);
+ expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+ "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"execution_command\"")).andReturn(executionCommandResultSet);
+ expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+ "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"stage\"")).andReturn(stageResultSet);
+ expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+ "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"request\"")).andReturn(requestResultSet);
+ expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " +
+ "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"alert_history\"")).andReturn(alertHistoryResultSet);
+
+ DatabaseConsistencyCheckHelper.setInjector(mockInjector);
+
+ easyMockSupport.replayAll();
+
+ mockAmbariMetainfo.init();
+
+ DatabaseConsistencyCheckHelper.resetCheckResult();
+ DatabaseConsistencyCheckHelper.checkForLargeTables();
+
+ easyMockSupport.verifyAll();
+ }
}