You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2018/05/07 07:51:00 UTC

[ambari] branch trunk updated: [AMBARI-22827] Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size (#1179)

This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 004fd24  [AMBARI-22827] Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size (#1179)
004fd24 is described below

commit 004fd249aaa8e52f5659e7cffece61261238ebf3
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Mon May 7 09:50:57 2018 +0200

    [AMBARI-22827] Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size (#1179)
    
    * AMBARI-22827. LdapModule is neeed for the application to start properly
    
    * AMBARI-22827. Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size
    
    * AMBARI-22827. Make sure we execute the load functions for all batches including the last one
---
 .../ambari/server/cleanup/CleanupDriver.java       |  3 +-
 .../apache/ambari/server/orm/dao/RequestDAO.java   | 98 +++++++++++++++-------
 2 files changed, 70 insertions(+), 31 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
index b4c8369..27bdc90 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
@@ -23,6 +23,7 @@ import java.util.Date;
 
 import org.apache.ambari.server.audit.AuditLoggerModule;
 import org.apache.ambari.server.controller.ControllerModule;
+import org.apache.ambari.server.ldap.LdapModule;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -80,7 +81,7 @@ public class CleanupDriver {
     CleanupContext cleanupContext = processArguments(args);
 
     // set up the guice context
-    Injector injector = Guice.createInjector(new ControllerModule(), new AuditLoggerModule(), new CleanupModule());
+    Injector injector = Guice.createInjector(new ControllerModule(), new AuditLoggerModule(), new CleanupModule(), new LdapModule());
 
     // explicitly starting the persist service
     injector.getInstance(AmbariJpaPersistService.class).start();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index d80ddd8..3c3d7bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -97,9 +97,6 @@ public class RequestDAO implements Cleanable {
   private HostRoleCommandDAO hostRoleCommandDAO;
 
   @Inject
-  private StageDAO stageDAO;
-
-  @Inject
   private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
 
   @Inject
@@ -315,11 +312,11 @@ public class RequestDAO implements Cleanable {
   protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
                                   String entityQuery, Class<T> type) {
     LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
-    EntityManager entityManager = entityManagerProvider.get();
     int affectedRows = 0;
-    // Batch delete
-    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
     if (ids != null && !ids.isEmpty()) {
+      EntityManager entityManager = entityManagerProvider.get();
+      // Batch delete
+      TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
       for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
         int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
         List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
@@ -350,11 +347,11 @@ public class RequestDAO implements Cleanable {
   protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
                                   String entityQuery, Class<T> type) {
     LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
-    EntityManager entityManager = entityManagerProvider.get();
     int affectedRows = 0;
-    // Batch delete
-    TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
     if (ids != null && !ids.isEmpty()) {
+      EntityManager entityManager = entityManagerProvider.get();
+      // Batch delete
+      TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
       for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
         int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
         List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
@@ -374,10 +371,8 @@ public class RequestDAO implements Cleanable {
   @Transactional
   @Override
   public long cleanup(TimeBasedCleanupPolicy policy) {
-    long affectedRows = 0;
-    Long clusterId = null;
     try {
-      clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+      final Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
       // find request and stage ids that were created before date populated by user.
       List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
 
@@ -392,7 +387,6 @@ public class RequestDAO implements Cleanable {
         }
       }
 
-
       Set<Long> requestIds = new HashSet<>();
       for (StageEntityPK ids : requestStageIds) {
         requestIds.add(ids.getRequestId());
@@ -400,28 +394,19 @@ public class RequestDAO implements Cleanable {
 
       // find task ids using request stage ids
       Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
-      LinkedList<String> params = new LinkedList<>();
-      params.add("stageId");
-      params.add("requestId");
 
       // find host task ids, to find related host requests and also to remove needed host tasks
-      Set<Long> hostTaskIds = new HashSet<>();
-      if (taskIds != null && !taskIds.isEmpty()) {
-        hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
-      }
+      final Set<Long> hostTaskIds = findHostTaskIds(taskIds);
 
       // find host request ids by host task ids to remove later needed host requests
-      Set<Long> hostRequestIds = new HashSet<>();
-      if (!hostTaskIds.isEmpty()) {
-        hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
-      }
-
-      Set<Long> topologyRequestIds = new HashSet<>();
-      if (!hostRequestIds.isEmpty()) {
-        topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
-      }
+      final Set<Long> hostRequestIds = findHostRequestIds(hostTaskIds);
 
+      final Set<Long> topologyRequestIds = findTopologyRequestIds(hostRequestIds);
 
+      final LinkedList<String> params = new LinkedList<>();
+      params.add("stageId");
+      params.add("requestId");
+      long affectedRows = 0;
       //removing all entities one by one according to their relations using stage, task and request ids
       affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
               "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
@@ -447,11 +432,64 @@ public class RequestDAO implements Cleanable {
       affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
               "RequestEntity.removeByRequestIds", RequestEntity.class);
 
+      return affectedRows;
     } catch (AmbariException e) {
       LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
       throw new IllegalStateException(e);
     }
+  }
 
-    return affectedRows;
+  private Set<Long> findHostTaskIds(Set<Long> taskIds) {
+    final Set<Long> hostTaskIds = new HashSet<>();
+    final Set<Long> partialTaskIds = new HashSet<>();
+    taskIds.forEach(taskId -> {
+      partialTaskIds.add(taskId);
+      if (partialTaskIds.size() == BATCH_SIZE) {
+        hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
+        partialTaskIds.clear();
+      }
+    });
+
+    if (!partialTaskIds.isEmpty()) {
+      hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
+    }
+    return hostTaskIds;
+  }
+
+  private Set<Long> findHostRequestIds(Set<Long> hostTaskIds) {
+    final Set<Long> hostRequestIds = new HashSet<>();
+    final Set<Long> partialHostTaskIds = new HashSet<>();
+
+    hostTaskIds.forEach(taskId -> {
+      partialHostTaskIds.add(taskId);
+      if (partialHostTaskIds.size() == BATCH_SIZE) {
+        hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
+        partialHostTaskIds.clear();
+      }
+    });
+
+    if (!partialHostTaskIds.isEmpty()) {
+      hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
+    }
+    return hostRequestIds;
   }
+
+  private Set<Long> findTopologyRequestIds(final Set<Long> hostRequestIds) {
+    final Set<Long> topologyRequestIds = new HashSet<>();
+    final Set<Long> partialHostRequestIds = new HashSet<>();
+
+    hostRequestIds.forEach(requestId -> {
+      partialHostRequestIds.add(requestId);
+      if (partialHostRequestIds.size() == BATCH_SIZE) {
+        topologyRequestIds.addAll(topologyLogicalRequestDAO.findRequestIdsByIds(partialHostRequestIds));
+        partialHostRequestIds.clear();
+      }
+    });
+
+    if (!partialHostRequestIds.isEmpty()) {
+      topologyRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostRequestIds));
+    }
+    return topologyRequestIds;
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
smolnar@apache.org.