You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/24 07:59:09 UTC
[ambari] branch trunk updated: AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)
This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new da0091b9ce AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)
da0091b9ce is described below
commit da0091b9ce856f28ff8722b4028ed8c26a828b84
Author: Yu Hou <52...@qq.com>
AuthorDate: Thu Nov 24 15:59:02 2022 +0800
AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)
---
.../apache/ambari/server/orm/dao/RequestDAO.java | 172 +++++++-------------
.../ambari/server/orm/dao/TopologyHostTaskDAO.java | 19 ++-
.../server/orm/dao/TopologyLogicalRequestDAO.java | 17 +-
.../server/orm/dao/TopologyLogicalTaskDAO.java | 17 +-
.../ambari/server/orm/helpers/SQLConstants.java | 28 ++++
.../ambari/server/orm/helpers/SQLOperations.java | 60 +++++++
.../src/main/python/ambari_server/dbCleanup.py | 177 ++++++++++++---------
.../server/orm/helpers/SQLOperationsTest.java | 54 +++++++
8 files changed, 336 insertions(+), 208 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 3c3d7bfb87..24e2231008 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -19,7 +19,6 @@
package org.apache.ambari.server.orm.dao;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -46,6 +45,8 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
import org.apache.ambari.server.state.Clusters;
import org.eclipse.persistence.config.HintValues;
import org.eclipse.persistence.config.QueryHints;
@@ -63,9 +64,6 @@ public class RequestDAO implements Cleanable {
private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class);
-
- private static final int BATCH_SIZE = 999;
-
/**
* SQL template to retrieve all request IDs, sorted by the ID.
*/
@@ -82,8 +80,6 @@ public class RequestDAO implements Cleanable {
private final static String REQUESTS_WITH_NO_CLUSTER_SQL =
"SELECT request.requestId FROM RequestEntity request WHERE request.clusterId = -1 OR request.clusterId IS NULL ORDER BY request.requestId %s";
-
-
@Inject
Provider<EntityManager> entityManagerProvider;
@@ -311,23 +307,21 @@ public class RequestDAO implements Cleanable {
@Transactional
protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
String entityQuery, Class<T> type) {
- LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
- int affectedRows = 0;
- if (ids != null && !ids.isEmpty()) {
- EntityManager entityManager = entityManagerProvider.get();
- // Batch delete
- TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
- for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
- int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
- List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
- LOG.info("Deleting " + entityName + " entity batch with task ids: " +
- idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
- query.setParameter(paramName, idsSubList);
- affectedRows += query.executeUpdate();
- }
+ LOG.info(String.format("Purging %s entity records before date %s", entityName, new Date(beforeDateMillis)));
+
+ if (ids == null || ids.isEmpty()) {
+ return 0;
}
- return affectedRows;
+ final EntityManager entityManager = entityManagerProvider.get();
+ final TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+
+ // Batch delete
+ return SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+ LOG.info(String.format("Purging %s entity, batch %s/%s.", entityName, currentBatch, totalBatches));
+ query.setParameter(paramName, chunk);
+ return query.executeUpdate();
+ });
}
/**
@@ -346,33 +340,34 @@ public class RequestDAO implements Cleanable {
@Transactional
protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
String entityQuery, Class<T> type) {
- LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
- int affectedRows = 0;
- if (ids != null && !ids.isEmpty()) {
- EntityManager entityManager = entityManagerProvider.get();
- // Batch delete
- TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
- for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
- int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
- List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
- LOG.info("Deleting " + entityName + " entity batch with task ids: " +
- idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
- for (StageEntityPK requestIds : idsSubList) {
- query.setParameter(paramNames.get(0), requestIds.getStageId());
- query.setParameter(paramNames.get(1), requestIds.getRequestId());
- affectedRows += query.executeUpdate();
- }
- }
+ LOG.info(String.format("Purging %s entity records before date %s", entityName, new Date(beforeDateMillis)));
+
+ if (ids == null || ids.isEmpty()) {
+ return 0;
}
- return affectedRows;
+ final EntityManager entityManager = entityManagerProvider.get();
+ final TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+
+ // Batch delete
+ return SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+ int affectedRows = 0;
+
+ for (StageEntityPK requestIds : chunk) {
+ query.setParameter(paramNames.get(0), requestIds.getStageId());
+ query.setParameter(paramNames.get(1), requestIds.getRequestId());
+ affectedRows += query.executeUpdate();
+ }
+ return affectedRows;
+ });
}
@Transactional
@Override
public long cleanup(TimeBasedCleanupPolicy policy) {
+ long affectedRows = 0;
try {
- final Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+ Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
// find request and stage ids that were created before date populated by user.
List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
@@ -380,116 +375,59 @@ public class RequestDAO implements Cleanable {
// request ids set that we already have. We don't want to make any changes for upgrade
Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator();
+ Set<Long> requestIds = new HashSet<>();
while (requestStageIdsIterator.hasNext()) {
StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
requestStageIdsIterator.remove();
+ continue;
}
- }
-
- Set<Long> requestIds = new HashSet<>();
- for (StageEntityPK ids : requestStageIds) {
- requestIds.add(ids.getRequestId());
+ requestIds.add(nextRequestStageIds.getRequestId());
}
// find task ids using request stage ids
Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
+ LinkedList<String> params = new LinkedList<>();
+ params.add("stageId");
+ params.add("requestId");
// find host task ids, to find related host requests and also to remove needed host tasks
- final Set<Long> hostTaskIds = findHostTaskIds(taskIds);
+ Set<Long> hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
// find host request ids by host task ids to remove later needed host requests
- final Set<Long> hostRequestIds = findHostRequestIds(hostTaskIds);
+ Set<Long> hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
+ Set<Long> topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
- final Set<Long> topologyRequestIds = findTopologyRequestIds(hostRequestIds);
-
- final LinkedList<String> params = new LinkedList<>();
- params.add("stageId");
- params.add("requestId");
- long affectedRows = 0;
//removing all entities one by one according to their relations using stage, task and request ids
affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
- "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
+ "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
- "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
+ "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
- "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
+ "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
- "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
+ "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
for (Long topologyRequestId : topologyRequestIds) {
topologyRequestDAO.removeByPK(topologyRequestId);
}
affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
- "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
+ "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
- "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
+ "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
- "StageEntity.removeByRequestStageIds", StageEntity.class);
+ "StageEntity.removeByRequestStageIds", StageEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
- "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
+ "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
- "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
+ "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
- "RequestEntity.removeByRequestIds", RequestEntity.class);
+ "RequestEntity.removeByRequestIds", RequestEntity.class);
- return affectedRows;
} catch (AmbariException e) {
LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
throw new IllegalStateException(e);
}
- }
- private Set<Long> findHostTaskIds(Set<Long> taskIds) {
- final Set<Long> hostTaskIds = new HashSet<>();
- final Set<Long> partialTaskIds = new HashSet<>();
- taskIds.forEach(taskId -> {
- partialTaskIds.add(taskId);
- if (partialTaskIds.size() == BATCH_SIZE) {
- hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
- partialTaskIds.clear();
- }
- });
-
- if (!partialTaskIds.isEmpty()) {
- hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
- }
- return hostTaskIds;
- }
-
- private Set<Long> findHostRequestIds(Set<Long> hostTaskIds) {
- final Set<Long> hostRequestIds = new HashSet<>();
- final Set<Long> partialHostTaskIds = new HashSet<>();
-
- hostTaskIds.forEach(taskId -> {
- partialHostTaskIds.add(taskId);
- if (partialHostTaskIds.size() == BATCH_SIZE) {
- hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
- partialHostTaskIds.clear();
- }
- });
-
- if (!partialHostTaskIds.isEmpty()) {
- hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
- }
- return hostRequestIds;
- }
-
- private Set<Long> findTopologyRequestIds(final Set<Long> hostRequestIds) {
- final Set<Long> topologyRequestIds = new HashSet<>();
- final Set<Long> partialHostRequestIds = new HashSet<>();
-
- hostRequestIds.forEach(requestId -> {
- partialHostRequestIds.add(requestId);
- if (partialHostRequestIds.size() == BATCH_SIZE) {
- topologyRequestIds.addAll(topologyLogicalRequestDAO.findRequestIdsByIds(partialHostRequestIds));
- partialHostRequestIds.clear();
- }
- });
-
- if (!partialHostRequestIds.isEmpty()) {
- topologyRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostRequestIds));
- }
- return topologyRequestIds;
+ return affectedRows;
}
-
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 70fdc77fdf..7b375c6d4e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.orm.dao;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -26,6 +27,8 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@@ -57,13 +60,17 @@ public class TopologyHostTaskDAO {
@RequiresSession
public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) {
+ final Set<Long> result = new HashSet<>();
EntityManager entityManager = entityManagerProvider.get();
- TypedQuery<Long> topologyHostTaskQuery =
- entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
-
- topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
-
- return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
+ final TypedQuery<Long> topologyHostTaskQuery =
+ entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
+
+ SQLOperations.batch(hostTaskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+ topologyHostTaskQuery.setParameter("hostTaskIds", chunk);
+ result.addAll(daoUtils.selectList(topologyHostTaskQuery));
+ return 0;
+ });
+ return Sets.newHashSet(result);
}
@RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index f23cc9f346..7d7981c92c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.orm.dao;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -25,6 +26,8 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@@ -68,11 +71,15 @@ public class TopologyLogicalRequestDAO {
@RequiresSession
public Set<Long> findRequestIdsByIds(Set<Long> ids) {
EntityManager entityManager = entityManagerProvider.get();
- TypedQuery<Long> topologyLogicalRequestQuery =
- entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
+ final Set<Long> result = new HashSet<>();
+ final TypedQuery<Long> topologyLogicalRequestQuery =
+ entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
- topologyLogicalRequestQuery.setParameter("ids", ids);
-
- return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery));
+ SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+ topologyLogicalRequestQuery.setParameter("ids", chunk);
+ result.addAll(daoUtils.selectList(topologyLogicalRequestQuery));
+ return 0;
+ });
+ return Sets.newHashSet(result);
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index abde5ec085..5844e6c38f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.orm.dao;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -25,6 +26,8 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@@ -48,12 +51,16 @@ public class TopologyLogicalTaskDAO {
@RequiresSession
public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) {
EntityManager entityManager = entityManagerProvider.get();
- TypedQuery<Long> topologyHostTaskQuery =
- entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
+ final Set<Long> result = new HashSet<>();
+ final TypedQuery<Long> topologyHostTaskQuery =
+ entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
- topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
-
- return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
+ SQLOperations.batch(physicalTaskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+ topologyHostTaskQuery.setParameter("physicalTaskIds", chunk);
+ result.addAll(daoUtils.selectList(topologyHostTaskQuery));
+ return 0;
+ });
+ return Sets.newHashSet(result);
}
@RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java
new file mode 100644
index 0000000000..8de5a19b13
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+public interface SQLConstants {
+ /**
+ * Defines maximum amount of elements in the IN clause.
+ *
+ * Example: SELECT * FROM tbl WHERE tbl.id IN (n1,n2...IN_ARGUMENT_BATCH_SIZE)
+ */
+ int IN_ARGUMENT_MAX_SIZE = 999;
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java
new file mode 100644
index 0000000000..f308fbf70a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+import java.util.Collection;
+
+import com.google.common.collect.Iterables;
+
+public class SQLOperations {
+ public interface BatchOperation<T> {
+ /**
+ * @param chunk portion of total batch
+ * @param currentBatch current batch number
+ * @param totalBatches total batches
+ * @param totalSize total size of the batch
+ * @return number of processed items
+ */
+ int chunk(Collection<T> chunk, int currentBatch, int totalBatches, int totalSize);
+ }
+
+ /**
+ * Split the collection to the batches
+ *
+ * @param collection collection with values to be splitted
+ * @param batchSize size of the one split
+ * @param callback logic to process one split
+ * @return Total number of processed items in collection
+ */
+ public static <T> int batch(Collection<T> collection, int batchSize, BatchOperation<T> callback) {
+ if (collection == null || collection.isEmpty() || batchSize == 0) {
+ return 0;
+ }
+ int totalSize = collection.size();
+ int totalChunks = (int) Math.ceil((float) totalSize / batchSize);
+ int currentChunk = 0;
+ int resultSum = 0;
+
+ for (Collection<T> chunk : Iterables.partition(collection, batchSize)) {
+ currentChunk += 1;
+ resultSum += callback.chunk(chunk, currentChunk, totalChunks, totalSize);
+ }
+ return resultSum;
+ }
+}
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index 26111412f2..860d144e74 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -32,93 +32,120 @@ import logging
logger = logging.getLogger(__name__)
-DB_CLEANUP_CMD = "{0} -cp {1} org.apache.ambari.server.cleanup.CleanupDriver --cluster-name {2} --from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
+DEBUG_MODE = "n"
+DEBUG_PORT = "5005"
+DEBUG_SUSPEND_AT_START = "n"
-#
-# Run the db cleanup process
-#
-def run_db_purge(options):
-
- if validate_args(options):
- return 1
- status, stateDesc = is_server_runing()
+DB_CLEANUP_CMD = "{0} " \
+ "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
+ "--cluster-name {2} " \
+ "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
- if not options.silent:
- db_title = get_db_type(get_ambari_properties()).title
+DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={4},address={5} " \
+ "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
+ "--cluster-name {2} " \
+ "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
- confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
- db_title), True)
- if not confirmBackup:
- print_info_msg("Ambari Server Database purge aborted")
- return 0
- if status:
- print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
- return 1
+def run_db_purge(options):
+ """
+ Run the db cleanup process
+ """
+ if validate_args(options):
+ return 1
- confirm = get_YN_input(
- "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
- db_title, options.purge_from_date), True)
- if not confirm:
- print_info_msg("Ambari Server Database purge aborted")
- return 0
+ status, state_desc = is_server_runing()
+ if not options.silent:
+ db_title = get_db_type(get_ambari_properties()).title
+ confirm_backup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
+ db_title), True)
+ if not confirm_backup:
+ print_info_msg("Ambari Server Database purge aborted")
+ return 0
- jdk_path = get_java_exe_path()
- if jdk_path is None:
- print_error_msg("No JDK found, please run the \"setup\" command to install a JDK automatically or install any "
- "JDK manually to {0}".format(configDefaults.JDK_INSTALL_DIR));
+ if status:
+ print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
return 1
- ensure_jdbc_driver_is_installed(options, get_ambari_properties())
-
- serverClassPath = ServerClassPath(get_ambari_properties(), options)
- class_path = serverClassPath.get_full_ambari_classpath_escaped_for_shell()
-
- ambari_user = read_ambari_user()
- current_user = ensure_can_start_under_current_user(ambari_user)
- environ = generate_env(options, ambari_user, current_user)
-
- print "Purging historical data from the database ..."
- command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.purge_from_date)
- (retcode, stdout, stderr) = run_os_command(command, env=environ)
-
- print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
-
- if stdout:
- print "Console output from database purge-history command:"
- print stdout
- print
- if stderr:
- print "Error output from database purge-history command:"
- print stderr
- print
- if retcode > 0:
- print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
- else:
- print "Purging historical data completed. Check the ambari-server.log for details."
- return retcode
-
-#
-# Database purge
-#
-def db_purge(options):
- return run_db_purge(options)
+ confirm = get_YN_input(
+ "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
+ db_title, options.purge_from_date), True)
+ if not confirm:
+ print_info_msg("Ambari Server Database purge aborted")
+ return 0
+
+ jdk_path = get_java_exe_path()
+ if jdk_path is None:
+ print_error_msg("No JDK found, please run the \"setup\" command to install a JDK automatically or install any "
+ "JDK manually to {0}".format(configDefaults.JDK_INSTALL_DIR))
+ return 1
+
+ ensure_jdbc_driver_is_installed(options, get_ambari_properties())
+
+ server_class_path = ServerClassPath(get_ambari_properties(), options)
+ class_path = server_class_path.get_full_ambari_classpath_escaped_for_shell()
+
+ ambari_user = read_ambari_user()
+ current_user = ensure_can_start_under_current_user(ambari_user)
+ environ = generate_env(options, ambari_user, current_user)
+
+ print "Purging historical data from the database ..."
+ if DEBUG_MODE and DEBUG_MODE == "y":
+ command = DB_DEBUG_CLEANUP_CMD.format(
+ jdk_path,
+ class_path,
+ options.cluster_name,
+ options.purge_from_date,
+ DEBUG_SUSPEND_AT_START,
+ DEBUG_PORT
+ )
+ else:
+ command = DB_CLEANUP_CMD.format(
+ jdk_path,
+ class_path,
+ options.cluster_name,
+ options.purge_from_date
+ )
+
+ retcode, stdout, stderr = run_os_command(command, env=environ)
+
+ print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
+ if stdout:
+ print "Console output from database purge-history command:"
+ print stdout
+ print
+ if stderr:
+ print "Error output from database purge-history command:"
+ print stderr
+ print
+ if retcode > 0:
+ print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
+ else:
+ print "Purging historical data completed. Check the ambari-server.log for details."
+ return retcode
-def validate_args(options):
- if not options.cluster_name:
- print_error_msg("Please provide the --cluster-name argument.")
- return 1
+def db_purge(options):
+ """
+ Database purge
+ """
+ return run_db_purge(options)
- if not options.purge_from_date:
- print_error_msg("Please provide the --from-date argument.")
- return 1
- try:
- datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
- except ValueError as e:
- print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
- return 1;
+def validate_args(options):
+ if not options.cluster_name:
+ print_error_msg("Please provide the --cluster-name argument.")
+ return 1
+
+ if not options.purge_from_date:
+ print_error_msg("Please provide the --from-date argument.")
+ return 1
+
+ try:
+ datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
+ except ValueError as e:
+ print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
+ return 1
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java
new file mode 100644
index 0000000000..06ecc7f89c
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class SQLOperationsTest {
+ @Before
+ public void setup() throws Exception {
+
+ }
+
+ @Test
+ public void testBatchOperation() {
+ final List<Integer> testCollection = new ArrayList<>();
+ final int collectionSize = 150;
+ final int batchSize = 10;
+ final int totalExpectedBatches = (int) Math.ceil((float) collectionSize / batchSize);
+
+ for (int i = 0; i < collectionSize; i++) {
+ testCollection.add(i);
+ }
+
+ int processedItems = SQLOperations.batch(testCollection, batchSize, (chunk, currentBatch, totalBatches, totalSize) -> {
+ Assert.assertTrue(chunk.size() <= batchSize);
+ Assert.assertEquals(totalExpectedBatches, totalBatches);
+ return chunk.size();
+ });
+ Assert.assertEquals(collectionSize, processedItems);
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org