You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/24 07:59:09 UTC

[ambari] branch trunk updated: AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)

This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da0091b9ce AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)
da0091b9ce is described below

commit da0091b9ce856f28ff8722b4028ed8c26a828b84
Author: Yu Hou <52...@qq.com>
AuthorDate: Thu Nov 24 15:59:02 2022 +0800

    AMBARI-22827: DB Cleanup scripts are using IN clauses (#3565)
---
 .../apache/ambari/server/orm/dao/RequestDAO.java   | 172 +++++++-------------
 .../ambari/server/orm/dao/TopologyHostTaskDAO.java |  19 ++-
 .../server/orm/dao/TopologyLogicalRequestDAO.java  |  17 +-
 .../server/orm/dao/TopologyLogicalTaskDAO.java     |  17 +-
 .../ambari/server/orm/helpers/SQLConstants.java    |  28 ++++
 .../ambari/server/orm/helpers/SQLOperations.java   |  60 +++++++
 .../src/main/python/ambari_server/dbCleanup.py     | 177 ++++++++++++---------
 .../server/orm/helpers/SQLOperationsTest.java      |  54 +++++++
 8 files changed, 336 insertions(+), 208 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 3c3d7bfb87..24e2231008 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -19,7 +19,6 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.text.MessageFormat;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -46,6 +45,8 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
 import org.apache.ambari.server.state.Clusters;
 import org.eclipse.persistence.config.HintValues;
 import org.eclipse.persistence.config.QueryHints;
@@ -63,9 +64,6 @@ public class RequestDAO implements Cleanable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class);
 
-
-  private static final int BATCH_SIZE = 999;
-
   /**
    * SQL template to retrieve all request IDs, sorted by the ID.
    */
@@ -82,8 +80,6 @@ public class RequestDAO implements Cleanable {
   private final static String REQUESTS_WITH_NO_CLUSTER_SQL =
       "SELECT request.requestId FROM RequestEntity request WHERE request.clusterId = -1 OR request.clusterId IS NULL ORDER BY request.requestId %s";
 
-
-
   @Inject
   Provider<EntityManager> entityManagerProvider;
 
@@ -311,23 +307,21 @@ public class RequestDAO implements Cleanable {
   @Transactional
   protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
                                   String entityQuery, Class<T> type) {
-    LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
-    int affectedRows = 0;
-    if (ids != null && !ids.isEmpty()) {
-      EntityManager entityManager = entityManagerProvider.get();
-      // Batch delete
-      TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
-      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
-        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
-        List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
-        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
-                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
-        query.setParameter(paramName, idsSubList);
-        affectedRows += query.executeUpdate();
-      }
+    LOG.info(String.format("Purging %s entity records before date %s", entityName, new Date(beforeDateMillis)));
+
+    if (ids == null || ids.isEmpty()) {
+      return 0;
     }
 
-    return affectedRows;
+    final EntityManager entityManager = entityManagerProvider.get();
+    final TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+
+    // Batch delete
+    return SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+      LOG.info(String.format("Purging %s entity, batch %s/%s.", entityName, currentBatch, totalBatches));
+      query.setParameter(paramName, chunk);
+      return query.executeUpdate();
+    });
   }
 
   /**
@@ -346,33 +340,34 @@ public class RequestDAO implements Cleanable {
   @Transactional
   protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
                                   String entityQuery, Class<T> type) {
-    LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
-    int affectedRows = 0;
-    if (ids != null && !ids.isEmpty()) {
-      EntityManager entityManager = entityManagerProvider.get();
-      // Batch delete
-      TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
-      for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
-        int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
-        List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
-        LOG.info("Deleting " + entityName + " entity batch with task ids: " +
-                idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1));
-        for (StageEntityPK requestIds : idsSubList) {
-          query.setParameter(paramNames.get(0), requestIds.getStageId());
-          query.setParameter(paramNames.get(1), requestIds.getRequestId());
-          affectedRows += query.executeUpdate();
-        }
-      }
+    LOG.info(String.format("Purging %s entity records before date %s", entityName, new Date(beforeDateMillis)));
+
+    if (ids == null || ids.isEmpty()) {
+      return 0;
     }
 
-    return affectedRows;
+    final EntityManager entityManager = entityManagerProvider.get();
+    final TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
+
+    // Batch delete
+    return SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+      int affectedRows = 0;
+
+      for (StageEntityPK requestIds : chunk) {
+        query.setParameter(paramNames.get(0), requestIds.getStageId());
+        query.setParameter(paramNames.get(1), requestIds.getRequestId());
+        affectedRows += query.executeUpdate();
+      }
+      return affectedRows;
+    });
   }
 
   @Transactional
   @Override
   public long cleanup(TimeBasedCleanupPolicy policy) {
+    long affectedRows = 0;
     try {
-      final Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+      Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
       // find request and stage ids that were created before date populated by user.
       List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
 
@@ -380,116 +375,59 @@ public class RequestDAO implements Cleanable {
       // request ids set that we already have. We don't want to make any changes for upgrade
       Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
       Iterator<StageEntityPK> requestStageIdsIterator =  requestStageIds.iterator();
+      Set<Long> requestIds = new HashSet<>();
       while (requestStageIdsIterator.hasNext()) {
         StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
         if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) {
           requestStageIdsIterator.remove();
+          continue;
         }
-      }
-
-      Set<Long> requestIds = new HashSet<>();
-      for (StageEntityPK ids : requestStageIds) {
-        requestIds.add(ids.getRequestId());
+        requestIds.add(nextRequestStageIds.getRequestId());
       }
 
       // find task ids using request stage ids
       Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
+      LinkedList<String> params = new LinkedList<>();
+      params.add("stageId");
+      params.add("requestId");
 
       // find host task ids, to find related host requests and also to remove needed host tasks
-      final Set<Long> hostTaskIds = findHostTaskIds(taskIds);
+      Set<Long> hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
 
       // find host request ids by host task ids to remove later needed host requests
-      final Set<Long> hostRequestIds = findHostRequestIds(hostTaskIds);
+      Set<Long> hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
+      Set<Long> topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
 
-      final Set<Long> topologyRequestIds = findTopologyRequestIds(hostRequestIds);
-
-      final LinkedList<String> params = new LinkedList<>();
-      params.add("stageId");
-      params.add("requestId");
-      long affectedRows = 0;
       //removing all entities one by one according to their relations using stage, task and request ids
       affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
-              "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
+        "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
       affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
-              "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
+        "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
       affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
-              "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
+        "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
       affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
-              "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
+        "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
       for (Long topologyRequestId : topologyRequestIds) {
         topologyRequestDAO.removeByPK(topologyRequestId);
       }
       affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(),
-              "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
+        "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class);
       affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(),
-              "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
+        "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class);
       affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(),
-              "StageEntity.removeByRequestStageIds", StageEntity.class);
+        "StageEntity.removeByRequestStageIds", StageEntity.class);
       affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(),
-              "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
+        "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class);
       affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(),
-              "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
+        "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class);
       affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
-              "RequestEntity.removeByRequestIds", RequestEntity.class);
+        "RequestEntity.removeByRequestIds", RequestEntity.class);
 
-      return affectedRows;
     } catch (AmbariException e) {
       LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
       throw new IllegalStateException(e);
     }
-  }
 
-  private Set<Long> findHostTaskIds(Set<Long> taskIds) {
-    final Set<Long> hostTaskIds = new HashSet<>();
-    final Set<Long> partialTaskIds = new HashSet<>();
-    taskIds.forEach(taskId -> {
-      partialTaskIds.add(taskId);
-      if (partialTaskIds.size() == BATCH_SIZE) {
-        hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
-        partialTaskIds.clear();
-      }
-    });
-
-    if (!partialTaskIds.isEmpty()) {
-      hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
-    }
-    return hostTaskIds;
-  }
-
-  private Set<Long> findHostRequestIds(Set<Long> hostTaskIds) {
-    final Set<Long> hostRequestIds = new HashSet<>();
-    final Set<Long> partialHostTaskIds = new HashSet<>();
-
-    hostTaskIds.forEach(taskId -> {
-      partialHostTaskIds.add(taskId);
-      if (partialHostTaskIds.size() == BATCH_SIZE) {
-        hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
-        partialHostTaskIds.clear();
-      }
-    });
-
-    if (!partialHostTaskIds.isEmpty()) {
-      hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
-    }
-    return hostRequestIds;
-  }
-
-  private Set<Long> findTopologyRequestIds(final Set<Long> hostRequestIds) {
-    final Set<Long> topologyRequestIds = new HashSet<>();
-    final Set<Long> partialHostRequestIds = new HashSet<>();
-
-    hostRequestIds.forEach(requestId -> {
-      partialHostRequestIds.add(requestId);
-      if (partialHostRequestIds.size() == BATCH_SIZE) {
-        topologyRequestIds.addAll(topologyLogicalRequestDAO.findRequestIdsByIds(partialHostRequestIds));
-        partialHostRequestIds.clear();
-      }
-    });
-
-    if (!partialHostRequestIds.isEmpty()) {
-      topologyRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostRequestIds));
-    }
-    return topologyRequestIds;
+    return affectedRows;
   }
-
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 70fdc77fdf..7b375c6d4e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -26,6 +27,8 @@ import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
 
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
@@ -57,13 +60,17 @@ public class TopologyHostTaskDAO {
 
   @RequiresSession
   public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) {
+    final Set<Long> result = new HashSet<>();
     EntityManager entityManager = entityManagerProvider.get();
-    TypedQuery<Long> topologyHostTaskQuery =
-            entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
-
-    topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
-
-    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
+    final TypedQuery<Long> topologyHostTaskQuery =
+      entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
+
+    SQLOperations.batch(hostTaskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+      topologyHostTaskQuery.setParameter("hostTaskIds", chunk);
+      result.addAll(daoUtils.selectList(topologyHostTaskQuery));
+      return 0;
+    });
+    return Sets.newHashSet(result);
   }
 
   @RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index f23cc9f346..7d7981c92c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -25,6 +26,8 @@ import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
 
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
@@ -68,11 +71,15 @@ public class TopologyLogicalRequestDAO {
   @RequiresSession
   public Set<Long> findRequestIdsByIds(Set<Long> ids) {
     EntityManager entityManager = entityManagerProvider.get();
-    TypedQuery<Long> topologyLogicalRequestQuery =
-            entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
+    final Set<Long> result = new HashSet<>();
+    final TypedQuery<Long> topologyLogicalRequestQuery =
+      entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
 
-    topologyLogicalRequestQuery.setParameter("ids", ids);
-
-    return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery));
+    SQLOperations.batch(ids, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+      topologyLogicalRequestQuery.setParameter("ids", chunk);
+      result.addAll(daoUtils.selectList(topologyLogicalRequestQuery));
+      return 0;
+    });
+    return Sets.newHashSet(result);
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index abde5ec085..5844e6c38f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.orm.dao;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -25,6 +26,8 @@ import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.orm.helpers.SQLConstants;
+import org.apache.ambari.server.orm.helpers.SQLOperations;
 
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
@@ -48,12 +51,16 @@ public class TopologyLogicalTaskDAO {
   @RequiresSession
   public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) {
     EntityManager entityManager = entityManagerProvider.get();
-    TypedQuery<Long> topologyHostTaskQuery =
-            entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
+    final Set<Long> result = new HashSet<>();
+    final TypedQuery<Long> topologyHostTaskQuery =
+      entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
 
-    topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
-
-    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
+    SQLOperations.batch(physicalTaskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
+      topologyHostTaskQuery.setParameter("physicalTaskIds", chunk);
+      result.addAll(daoUtils.selectList(topologyHostTaskQuery));
+      return 0;
+    });
+    return Sets.newHashSet(result);
   }
 
   @RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java
new file mode 100644
index 0000000000..8de5a19b13
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+public interface SQLConstants {
+  /**
+   * Defines maximum amount of elements in the IN clause.
+   *
+   * Example: SELECT * FROM tbl WHERE tbl.id IN (n1,n2...IN_ARGUMENT_BATCH_SIZE)
+   */
+  int IN_ARGUMENT_MAX_SIZE = 999;
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java
new file mode 100644
index 0000000000..f308fbf70a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/SQLOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+import java.util.Collection;
+
+import com.google.common.collect.Iterables;
+
+public class SQLOperations {
+  public interface BatchOperation<T> {
+    /**
+     * @param chunk        portion of total batch
+     * @param currentBatch current batch number
+     * @param totalBatches total batches
+     * @param totalSize    total size of the batch
+     * @return number of processed items
+     */
+    int chunk(Collection<T> chunk, int currentBatch, int totalBatches, int totalSize);
+  }
+
+  /**
+   * Split the collection to the batches
+   *
+   * @param collection collection with values to be splitted
+   * @param batchSize  size of the one split
+   * @param callback   logic to process one split
+   * @return Total number of processed items in collection
+   */
+  public static <T> int batch(Collection<T> collection, int batchSize, BatchOperation<T> callback) {
+    if (collection == null || collection.isEmpty() || batchSize == 0) {
+      return 0;
+    }
+    int totalSize = collection.size();
+    int totalChunks = (int) Math.ceil((float) totalSize / batchSize);
+    int currentChunk = 0;
+    int resultSum = 0;
+
+    for (Collection<T> chunk : Iterables.partition(collection, batchSize)) {
+      currentChunk += 1;
+      resultSum += callback.chunk(chunk, currentChunk, totalChunks, totalSize);
+    }
+    return resultSum;
+  }
+}
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index 26111412f2..860d144e74 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -32,93 +32,120 @@ import logging
 
 logger = logging.getLogger(__name__)
 
-DB_CLEANUP_CMD = "{0} -cp {1} org.apache.ambari.server.cleanup.CleanupDriver --cluster-name {2} --from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
+DEBUG_MODE = "n"
+DEBUG_PORT = "5005"
+DEBUG_SUSPEND_AT_START = "n"
 
-#
-# Run the db cleanup process
-#
-def run_db_purge(options):
-
-    if validate_args(options):
-        return 1
 
-    status, stateDesc = is_server_runing()
+DB_CLEANUP_CMD = "{0} " \
+                 "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
+                 "--cluster-name {2} " \
+                 "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
 
-    if not options.silent:
-      db_title = get_db_type(get_ambari_properties()).title
+DB_DEBUG_CLEANUP_CMD = "{0} -agentlib:jdwp=transport=dt_socket,server=y,suspend={4},address={5} " \
+                       "-cp {1} org.apache.ambari.server.cleanup.CleanupDriver " \
+                       "--cluster-name {2} " \
+                       "--from-date {3}> " + configDefaults.SERVER_OUT_FILE + " 2>&1"
 
-      confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
-              db_title), True)
-      if not confirmBackup:
-          print_info_msg("Ambari Server Database purge aborted")
-          return 0
 
-      if status:
-          print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
-          return 1
+def run_db_purge(options):
+  """
+  Run the db cleanup process
+  """
+  if validate_args(options):
+      return 1
 
-      confirm = get_YN_input(
-          "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
-              db_title, options.purge_from_date), True)
-      if not confirm:
-          print_info_msg("Ambari Server Database purge aborted")
-          return 0
+  status, state_desc = is_server_runing()
 
+  if not options.silent:
+    db_title = get_db_type(get_ambari_properties()).title
 
+    confirm_backup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
+      db_title), True)
+    if not confirm_backup:
+      print_info_msg("Ambari Server Database purge aborted")
+      return 0
 
-    jdk_path = get_java_exe_path()
-    if jdk_path is None:
-        print_error_msg("No JDK found, please run the \"setup\" command to install a JDK automatically or install any "
-                        "JDK manually to {0}".format(configDefaults.JDK_INSTALL_DIR));
+    if status:
+        print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
         return 1
 
-    ensure_jdbc_driver_is_installed(options, get_ambari_properties())
-
-    serverClassPath = ServerClassPath(get_ambari_properties(), options)
-    class_path = serverClassPath.get_full_ambari_classpath_escaped_for_shell()
-
-    ambari_user = read_ambari_user()
-    current_user = ensure_can_start_under_current_user(ambari_user)
-    environ = generate_env(options, ambari_user, current_user)
-
-    print "Purging historical data from the database ..."
-    command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.purge_from_date)
-    (retcode, stdout, stderr) = run_os_command(command, env=environ)
-
-    print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
-
-    if stdout:
-        print "Console output from database purge-history command:"
-        print stdout
-        print
-    if stderr:
-        print "Error output from database purge-history command:"
-        print stderr
-        print
-    if retcode > 0:
-        print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
-    else:
-        print "Purging historical data completed. Check the ambari-server.log for details."
-    return retcode
-
-#
-# Database purge
-#
-def db_purge(options):
-    return run_db_purge(options)
+    confirm = get_YN_input(
+      "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
+        db_title, options.purge_from_date), True)
+    if not confirm:
+      print_info_msg("Ambari Server Database purge aborted")
+      return 0
+
+  jdk_path = get_java_exe_path()
+  if jdk_path is None:
+    print_error_msg("No JDK found, please run the \"setup\" command to install a JDK automatically or install any "
+                    "JDK manually to {0}".format(configDefaults.JDK_INSTALL_DIR))
+    return 1
+
+  ensure_jdbc_driver_is_installed(options, get_ambari_properties())
+
+  server_class_path = ServerClassPath(get_ambari_properties(), options)
+  class_path = server_class_path.get_full_ambari_classpath_escaped_for_shell()
+
+  ambari_user = read_ambari_user()
+  current_user = ensure_can_start_under_current_user(ambari_user)
+  environ = generate_env(options, ambari_user, current_user)
+
+  print "Purging historical data from the database ..."
+  if DEBUG_MODE and DEBUG_MODE == "y":
+    command = DB_DEBUG_CLEANUP_CMD.format(
+      jdk_path,
+      class_path,
+      options.cluster_name,
+      options.purge_from_date,
+      DEBUG_SUSPEND_AT_START,
+      DEBUG_PORT
+    )
+  else:
+    command = DB_CLEANUP_CMD.format(
+      jdk_path,
+      class_path,
+      options.cluster_name,
+      options.purge_from_date
+    )
+
+  retcode, stdout, stderr = run_os_command(command, env=environ)
+
+  print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
+  if stdout:
+    print "Console output from database purge-history command:"
+    print stdout
+    print
+  if stderr:
+    print "Error output from database purge-history command:"
+    print stderr
+    print
+  if retcode > 0:
+    print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
+  else:
+    print "Purging historical data completed. Check the ambari-server.log for details."
+  return retcode
 
 
-def validate_args(options):
-    if not options.cluster_name:
-        print_error_msg("Please provide the --cluster-name argument.")
-        return 1
+def db_purge(options):
+  """
+  Database purge
+  """
+  return run_db_purge(options)
 
-    if not options.purge_from_date:
-        print_error_msg("Please provide the --from-date argument.")
-        return 1
 
-    try:
-        datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
-    except ValueError as e:
-        print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
-        return 1;
+def validate_args(options):
+  if not options.cluster_name:
+    print_error_msg("Please provide the --cluster-name argument.")
+    return 1
+
+  if not options.purge_from_date:
+    print_error_msg("Please provide the --from-date argument.")
+    return 1
+
+  try:
+    datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
+  except ValueError as e:
+    print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
+    return 1
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java
new file mode 100644
index 0000000000..06ecc7f89c
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/helpers/SQLOperationsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.helpers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class SQLOperationsTest {
+  @Before
+  public void setup() throws Exception {
+
+  }
+
+  @Test
+  public void testBatchOperation() {
+    final List<Integer> testCollection = new ArrayList<>();
+    final int collectionSize = 150;
+    final int batchSize = 10;
+    final int totalExpectedBatches = (int) Math.ceil((float) collectionSize / batchSize);
+
+    for (int i = 0; i < collectionSize; i++) {
+      testCollection.add(i);
+    }
+
+    int processedItems = SQLOperations.batch(testCollection, batchSize, (chunk, currentBatch, totalBatches, totalSize) -> {
+      Assert.assertTrue(chunk.size() <= batchSize);
+      Assert.assertEquals(totalExpectedBatches, totalBatches);
+      return chunk.size();
+    });
+    Assert.assertEquals(collectionSize, processedItems);
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org