You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2018/05/07 07:51:00 UTC
[ambari] branch trunk updated: [AMBARI-22827] Loading hostTaskIds,
hostRequestIds and topologyRequestIds in batches to avoid issues
triggered by JDBC limitations on prepared statement parameter list size
(#1179)
This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 004fd24 [AMBARI-22827] Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size (#1179)
004fd24 is described below
commit 004fd249aaa8e52f5659e7cffece61261238ebf3
Author: Sandor Molnar <sm...@apache.org>
AuthorDate: Mon May 7 09:50:57 2018 +0200
[AMBARI-22827] Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size (#1179)
* AMBARI-22827. LdapModule is neeed for the application to start properly
* AMBARI-22827. Loading hostTaskIds, hostRequestIds and topologyRequestIds in batches to avoid issues triggered by JDBC limitations on prepared statement parameter list size
* AMBARI-22827. Make sure we execute the load functions for all batches including the last one
---
.../ambari/server/cleanup/CleanupDriver.java | 3 +-
.../apache/ambari/server/orm/dao/RequestDAO.java | 98 +++++++++++++++-------
2 files changed, 70 insertions(+), 31 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
index b4c8369..27bdc90 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
@@ -23,6 +23,7 @@ import java.util.Date;
import org.apache.ambari.server.audit.AuditLoggerModule;
import org.apache.ambari.server.controller.ControllerModule;
+import org.apache.ambari.server.ldap.LdapModule;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -80,7 +81,7 @@ public class CleanupDriver {
CleanupContext cleanupContext = processArguments(args);
// set up the guice context
- Injector injector = Guice.createInjector(new ControllerModule(), new AuditLoggerModule(), new CleanupModule());
+ Injector injector = Guice.createInjector(new ControllerModule(), new AuditLoggerModule(), new CleanupModule(), new LdapModule());
// explicitly starting the persist service
injector.getInstance(AmbariJpaPersistService.class).start();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index d80ddd8..3c3d7bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -97,9 +97,6 @@ public class RequestDAO implements Cleanable {
private HostRoleCommandDAO hostRoleCommandDAO;
@Inject
- private StageDAO stageDAO;
-
- @Inject
private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
@Inject
@@ -315,11 +312,11 @@ public class RequestDAO implements Cleanable {
protected <T> int cleanTableByIds(Set<Long> ids, String paramName, String entityName, Long beforeDateMillis,
String entityQuery, Class<T> type) {
LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
- EntityManager entityManager = entityManagerProvider.get();
int affectedRows = 0;
- // Batch delete
- TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
if (ids != null && !ids.isEmpty()) {
+ EntityManager entityManager = entityManagerProvider.get();
+ // Batch delete
+ TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
List<Long> idsSubList = new ArrayList<>(ids).subList(i, endRow);
@@ -350,11 +347,11 @@ public class RequestDAO implements Cleanable {
protected <T> int cleanTableByStageEntityPK(List<StageEntityPK> ids, LinkedList<String> paramNames, String entityName, Long beforeDateMillis,
String entityQuery, Class<T> type) {
LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis)));
- EntityManager entityManager = entityManagerProvider.get();
int affectedRows = 0;
- // Batch delete
- TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
if (ids != null && !ids.isEmpty()) {
+ EntityManager entityManager = entityManagerProvider.get();
+ // Batch delete
+ TypedQuery<T> query = entityManager.createNamedQuery(entityQuery, type);
for (int i = 0; i < ids.size(); i += BATCH_SIZE) {
int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE);
List<StageEntityPK> idsSubList = new ArrayList<>(ids).subList(i, endRow);
@@ -374,10 +371,8 @@ public class RequestDAO implements Cleanable {
@Transactional
@Override
public long cleanup(TimeBasedCleanupPolicy policy) {
- long affectedRows = 0;
- Long clusterId = null;
try {
- clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
+ final Long clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId();
// find request and stage ids that were created before date populated by user.
List<StageEntityPK> requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis());
@@ -392,7 +387,6 @@ public class RequestDAO implements Cleanable {
}
}
-
Set<Long> requestIds = new HashSet<>();
for (StageEntityPK ids : requestStageIds) {
requestIds.add(ids.getRequestId());
@@ -400,28 +394,19 @@ public class RequestDAO implements Cleanable {
// find task ids using request stage ids
Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
- LinkedList<String> params = new LinkedList<>();
- params.add("stageId");
- params.add("requestId");
// find host task ids, to find related host requests and also to remove needed host tasks
- Set<Long> hostTaskIds = new HashSet<>();
- if (taskIds != null && !taskIds.isEmpty()) {
- hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
- }
+ final Set<Long> hostTaskIds = findHostTaskIds(taskIds);
// find host request ids by host task ids to remove later needed host requests
- Set<Long> hostRequestIds = new HashSet<>();
- if (!hostTaskIds.isEmpty()) {
- hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
- }
-
- Set<Long> topologyRequestIds = new HashSet<>();
- if (!hostRequestIds.isEmpty()) {
- topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
- }
+ final Set<Long> hostRequestIds = findHostRequestIds(hostTaskIds);
+ final Set<Long> topologyRequestIds = findTopologyRequestIds(hostRequestIds);
+ final LinkedList<String> params = new LinkedList<>();
+ params.add("stageId");
+ params.add("requestId");
+ long affectedRows = 0;
//removing all entities one by one according to their relations using stage, task and request ids
affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(),
"ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
@@ -447,11 +432,64 @@ public class RequestDAO implements Cleanable {
affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(),
"RequestEntity.removeByRequestIds", RequestEntity.class);
+ return affectedRows;
} catch (AmbariException e) {
LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e);
throw new IllegalStateException(e);
}
+ }
- return affectedRows;
+ private Set<Long> findHostTaskIds(Set<Long> taskIds) {
+ final Set<Long> hostTaskIds = new HashSet<>();
+ final Set<Long> partialTaskIds = new HashSet<>();
+ taskIds.forEach(taskId -> {
+ partialTaskIds.add(taskId);
+ if (partialTaskIds.size() == BATCH_SIZE) {
+ hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
+ partialTaskIds.clear();
+ }
+ });
+
+ if (!partialTaskIds.isEmpty()) {
+ hostTaskIds.addAll(topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(partialTaskIds));
+ }
+ return hostTaskIds;
+ }
+
+ private Set<Long> findHostRequestIds(Set<Long> hostTaskIds) {
+ final Set<Long> hostRequestIds = new HashSet<>();
+ final Set<Long> partialHostTaskIds = new HashSet<>();
+
+ hostTaskIds.forEach(taskId -> {
+ partialHostTaskIds.add(taskId);
+ if (partialHostTaskIds.size() == BATCH_SIZE) {
+ hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
+ partialHostTaskIds.clear();
+ }
+ });
+
+ if (!partialHostTaskIds.isEmpty()) {
+ hostRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostTaskIds));
+ }
+ return hostRequestIds;
}
+
+ private Set<Long> findTopologyRequestIds(final Set<Long> hostRequestIds) {
+ final Set<Long> topologyRequestIds = new HashSet<>();
+ final Set<Long> partialHostRequestIds = new HashSet<>();
+
+ hostRequestIds.forEach(requestId -> {
+ partialHostRequestIds.add(requestId);
+ if (partialHostRequestIds.size() == BATCH_SIZE) {
+ topologyRequestIds.addAll(topologyLogicalRequestDAO.findRequestIdsByIds(partialHostRequestIds));
+ partialHostRequestIds.clear();
+ }
+ });
+
+ if (!partialHostRequestIds.isEmpty()) {
+ topologyRequestIds.addAll(topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(partialHostRequestIds));
+ }
+ return topologyRequestIds;
+ }
+
}
--
To stop receiving notification emails like this one, please contact
smolnar@apache.org.