You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/06/19 20:16:24 UTC
[02/25] ambari git commit: AMBARI-20749. Ambari data purging.
(stoader)
AMBARI-20749. Ambari data purging. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/834cb665
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/834cb665
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/834cb665
Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 834cb6655b4b56846f41141c5c55aa70c1eb5cf4
Parents: 7302da4
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Wed Jun 14 16:39:07 2017 +0200
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Thu Jun 15 18:07:47 2017 +0200
----------------------------------------------------------------------
ambari-server/sbin/ambari-server | 6 +--
.../checks/DatabaseConsistencyCheckHelper.java | 4 +-
.../ambari/server/cleanup/CleanupDriver.java | 15 ++++---
.../ambari/server/cleanup/CleanupService.java | 19 ++++++++-
.../server/cleanup/CleanupServiceImpl.java | 33 +++++++++++++--
.../server/orm/dao/HostRoleCommandDAO.java | 7 ++--
.../ambari/server/orm/dao/RequestDAO.java | 21 +++++-----
.../server/orm/dao/TopologyHostTaskDAO.java | 8 ++--
.../orm/dao/TopologyLogicalRequestDAO.java | 8 ++--
.../server/orm/dao/TopologyLogicalTaskDAO.java | 8 ++--
.../orm/entities/TopologyHostTaskEntity.java | 2 +-
.../entities/TopologyLogicalRequestEntity.java | 2 +-
.../orm/entities/TopologyLogicalTaskEntity.java | 2 +-
ambari-server/src/main/python/ambari-server.py | 18 ++++----
.../src/main/python/ambari_server/dbCleanup.py | 34 ++++++++--------
.../main/python/ambari_server/setupActions.py | 2 +-
.../server/cleanup/CleanupServiceImplTest.java | 43 ++++++++++++++++++--
17 files changed, 162 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/sbin/ambari-server
----------------------------------------------------------------------
diff --git a/ambari-server/sbin/ambari-server b/ambari-server/sbin/ambari-server
index 67e8aea..24ec43a 100755
--- a/ambari-server/sbin/ambari-server
+++ b/ambari-server/sbin/ambari-server
@@ -181,8 +181,8 @@ case "${1:-}" in
echo -e "Setting up SSO authentication properties..."
$PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
;;
- db-cleanup)
- echo -e "Cleanup database..."
+ db-purge-history)
+ echo -e "Purge database history..."
$PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
;;
install-mpack)
@@ -203,7 +203,7 @@ case "${1:-}" in
;;
*)
echo "Usage: $AMBARI_EXECUTABLE
- {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-cleanup|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options]
+ {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-purge-history|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options]
Use $AMBARI_PYTHON_EXECUTABLE <action> --help to get details on options available.
Or, simply invoke ambari-server.py --help to print the options."
exit 1
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index 4f93102..d1566d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -289,7 +289,7 @@ public class DatabaseConsistencyCheckHelper {
if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " +
- "that you reduce its size by executing \"ambari-server db-cleanup\".",
+ "that you reduce its size by executing \"ambari-server db-purge-history\".",
tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
} else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) {
LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)",
@@ -309,7 +309,7 @@ public class DatabaseConsistencyCheckHelper {
if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " +
- "recommended that you reduce its size by executing \"ambari-server db-cleanup\".",
+ "recommended that you reduce its size by executing \"ambari-server db-purge-history\".",
tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
} else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) {
LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT));
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
index 788290b..b4c8369 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
@@ -49,7 +49,7 @@ public class CleanupDriver {
private static Options getOptions() {
Options options = new Options();
options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The cluster name").required().type(String.class).hasArg().valueSeparator(' ').build());
- options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("The day from which the cleanup runs").required().type(String.class).hasArg().valueSeparator(' ').build());
+ options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up until data will be purged.").required().type(String.class).hasArg().valueSeparator(' ').build());
return options;
}
@@ -67,7 +67,7 @@ public class CleanupDriver {
} catch (Exception exp) {
System.err.println("Parsing failed. Reason: " + exp.getMessage());
LOGGER.error("Parsing failed. Reason: ", exp);
- formatter.printHelp("cleanup", getOptions());
+ formatter.printHelp("db-purge-history", getOptions());
System.exit(1);
}
return ctx;
@@ -75,7 +75,7 @@ public class CleanupDriver {
public static void main(String... args) throws Exception {
- LOGGER.info("DB-CLEANUP - Starting the cleanup process ...");
+ LOGGER.info("DB-PURGE - Starting the database purge process ...");
CleanupContext cleanupContext = processArguments(args);
@@ -86,12 +86,17 @@ public class CleanupDriver {
injector.getInstance(AmbariJpaPersistService.class).start();
CleanupServiceImpl cleanupService = injector.getInstance(CleanupServiceImpl.class);
- long affected = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp()));
+ CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp()));
// explicitly stopping the persist service
injector.getInstance(AmbariJpaPersistService.class).stop();
- LOGGER.info("DB-CLEANUP - completed. Number of affected records [{}]", affected);
+ if (result.getErrorCount() > 0) {
+ LOGGER.warn("DB-PURGE - completed with error, check Ambari Server log for details ! Number of affected records [{}]", result.getAffectedRows());
+ System.exit(2);
+ }
+
+ LOGGER.info("DB-PURGE - completed. Number of affected records [{}]", result.getAffectedRows());
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
index 880207c..8ac0b92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
@@ -23,11 +23,28 @@ package org.apache.ambari.server.cleanup;
*/
public interface CleanupService<T> {
+ interface CleanupResult {
+ /**
+ * Returns the number of rows deleted by the cleanup
+ * @return The total number of rows deleted by the cleanup
+ */
+ long getAffectedRows();
+
+ /**
+ * The cleanup process executes the specific cleanup operations via
+ * {@link org.apache.ambari.server.orm.dao.Cleanable} implementations.
+ * Some of these may fail during the cleanup process. This method returns
+ * the number of failed clean ups.
+ * @return The number of failed cleanups.
+ */
+ int getErrorCount();
+ }
+
/**
* Triggers the cleanup for the given cleanup policy.
*
* @param cleanupPolicy the cleanup policy based on which the cleanup is executed.
* @return the affected "rows"
*/
- long cleanup(T cleanupPolicy);
+ CleanupResult cleanup(T cleanupPolicy);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
index 29a9041..0436c92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
@@ -34,6 +34,25 @@ import com.google.inject.Singleton;
public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy> {
private static final Logger LOGGER = LoggerFactory.getLogger(CleanupServiceImpl.class);
+ class Result implements CleanupResult {
+ private final long affectedRows;
+ private final int errorCount;
+
+ public Result(long affectedRows, int errorCount) {
+ this.affectedRows = affectedRows;
+ this.errorCount = errorCount;
+ }
+
+ @Override
+ public long getAffectedRows() {
+ return affectedRows;
+ }
+
+ @Override
+ public int getErrorCount() {
+ return errorCount;
+ }
+ }
// this Set is automatically populated by the guice framework (based on the cleanup interface)
private Set<Cleanable> cleanables;
@@ -54,13 +73,21 @@ public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy
* @param cleanupPolicy the policy based on which the cleanup is done
* @return the number of affected rows
*/
- public long cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
+ public CleanupResult cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
long affectedRows = 0;
+ int errorCount = 0;
for (Cleanable cleanable : cleanables) {
LOGGER.info("Running the purge process for DAO: [{}] with cleanup policy: [{}]", cleanable, cleanupPolicy);
- affectedRows += cleanable.cleanup(cleanupPolicy);
+ try {
+ affectedRows += cleanable.cleanup(cleanupPolicy);
+ }
+ catch (Exception ex) {
+ LOGGER.error("Running the purge process for DAO: [{}] failed with: {}", cleanable, ex);
+ errorCount++;
+ }
}
- return affectedRows;
+
+ return new Result(affectedRows, errorCount);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 58a4180..7ac4b71 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -70,6 +70,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -994,9 +995,9 @@ public class HostRoleCommandDAO {
}
}
- public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+ public Set<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
EntityManager entityManager = entityManagerProvider.get();
- List<Long> taskIds = new ArrayList<Long>();
+ List<Long> taskIds = new ArrayList<>();
for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
TypedQuery<Long> hostRoleCommandQuery =
entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class);
@@ -1007,6 +1008,6 @@ public class HostRoleCommandDAO {
taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
}
- return taskIds;
+ return Sets.newHashSet(taskIds);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 8f16cb2..0d6682e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -52,7 +52,6 @@ import org.eclipse.persistence.config.QueryHints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -276,12 +275,12 @@ public class RequestDAO implements Cleanable {
* Search for all request ids in Upgrade table
* @return the list of request ids
*/
- private List<Long> findAllRequestIdsFromUpgrade() {
+ private Set<Long> findAllRequestIdsFromUpgrade() {
EntityManager entityManager = entityManagerProvider.get();
TypedQuery<Long> upgradeQuery =
entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class);
- return daoUtils.selectList(upgradeQuery);
+ return Sets.newHashSet(daoUtils.selectList(upgradeQuery));
}
/**
@@ -384,7 +383,7 @@ public class RequestDAO implements Cleanable {
// find request ids from Upgrade table and exclude these ids from
// request ids set that we already have. We don't want to make any changes for upgrade
- Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade());
+ Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
Iterator<StageEntityPK> requestStageIdsIterator = requestStageIds.iterator();
while (requestStageIdsIterator.hasNext()) {
StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
@@ -400,24 +399,24 @@ public class RequestDAO implements Cleanable {
}
// find task ids using request stage ids
- Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+ Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
LinkedList<String> params = new LinkedList<>();
params.add("stageId");
params.add("requestId");
// find host task ids, to find related host requests and also to remove needed host tasks
- List<Long> hostTaskIds = new ArrayList<>();
+ Set<Long> hostTaskIds = new HashSet<>();
if (taskIds != null && !taskIds.isEmpty()) {
- hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+ hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
}
// find host request ids by host task ids to remove later needed host requests
- List<Long> hostRequestIds = new ArrayList<>();
+ Set<Long> hostRequestIds = new HashSet<>();
if (!hostTaskIds.isEmpty()) {
hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
}
- List<Long> topologyRequestIds = new ArrayList<>();
+ Set<Long> topologyRequestIds = new HashSet<>();
if (!hostRequestIds.isEmpty()) {
topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
}
@@ -428,9 +427,9 @@ public class RequestDAO implements Cleanable {
"ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
"TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
- affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+ affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
"TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
- affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+ affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
"TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
for (Long topologyRequestId : topologyRequestIds) {
topologyRequestDAO.removeByPK(topologyRequestId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 05a4274..70fdc77 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.ambari.server.orm.dao;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
@@ -26,6 +27,7 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -54,14 +56,14 @@ public class TopologyHostTaskDAO {
}
@RequiresSession
- public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+ public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) {
EntityManager entityManager = entityManagerProvider.get();
TypedQuery<Long> topologyHostTaskQuery =
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
- return daoUtils.selectList(topologyHostTaskQuery);
+ return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
}
@RequiresSession
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index 29a15d6..f23cc9f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.ambari.server.orm.dao;
import java.util.List;
+import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
@@ -25,6 +26,7 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -64,13 +66,13 @@ public class TopologyLogicalRequestDAO {
}
@RequiresSession
- public List<Long> findRequestIdsByIds(List<Long> ids) {
+ public Set<Long> findRequestIdsByIds(Set<Long> ids) {
EntityManager entityManager = entityManagerProvider.get();
TypedQuery<Long> topologyLogicalRequestQuery =
entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
topologyLogicalRequestQuery.setParameter("ids", ids);
- return daoUtils.selectList(topologyLogicalRequestQuery);
+ return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery));
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index 9142473..abde5ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.ambari.server.orm.dao;
import java.util.List;
+import java.util.Set;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
@@ -25,6 +26,7 @@ import javax.persistence.TypedQuery;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -44,14 +46,14 @@ public class TopologyLogicalTaskDAO {
}
@RequiresSession
- public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) {
+ public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) {
EntityManager entityManager = entityManagerProvider.get();
TypedQuery<Long> topologyHostTaskQuery =
entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
- return daoUtils.selectList(topologyHostTaskQuery);
+ return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
}
@RequiresSession
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index 50462e4..5a27ce6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -42,7 +42,7 @@ import javax.persistence.TableGenerator;
@NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"),
@NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
- query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
+ query = "SELECT DISTINCT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
@NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds")
})
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 9182334..b243f4f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -33,7 +33,7 @@ import javax.persistence.Table;
@Entity
@Table(name = "topology_logical_request")
@NamedQueries({
- @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids")
+ @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT DISTINCT t.topologyLogicalRequestEntity.topologyRequestId from TopologyHostRequestEntity t WHERE t.id IN :ids")
})
public class TopologyLogicalRequestEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index c955074..04971c7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -36,7 +36,7 @@ import javax.persistence.TableGenerator;
pkColumnName = "sequence_name", valueColumnName = "sequence_value",
pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
@NamedQueries({
- @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
+ @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT DISTINCT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
@NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds")
})
public class TopologyLogicalTaskEntity {
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index d84e833..c7bdcf9 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -42,7 +42,7 @@ from ambari_server.setupHttps import setup_https, setup_truststore
from ambari_server.setupMpacks import install_mpack, uninstall_mpack, upgrade_mpack, STACK_DEFINITIONS_RESOURCE_NAME, \
SERVICE_DEFINITIONS_RESOURCE_NAME, MPACKS_RESOURCE_NAME
from ambari_server.setupSso import setup_sso
-from ambari_server.dbCleanup import db_cleanup
+from ambari_server.dbCleanup import db_purge
from ambari_server.hostUpdate import update_host_names
from ambari_server.checkDatabase import check_database
from ambari_server.enableStack import enable_stack_version
@@ -52,7 +52,7 @@ from ambari_server.setupActions import BACKUP_ACTION, LDAP_SETUP_ACTION, LDAP_SY
SETUP_ACTION, SETUP_SECURITY_ACTION,START_ACTION, STATUS_ACTION, STOP_ACTION, RESTART_ACTION, UPGRADE_ACTION, \
SETUP_JCE_ACTION, SET_CURRENT_ACTION, START_ACTION, STATUS_ACTION, STOP_ACTION, UPGRADE_ACTION, \
SETUP_JCE_ACTION, SET_CURRENT_ACTION, ENABLE_STACK_ACTION, SETUP_SSO_ACTION, \
- DB_CLEANUP_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
+ DB_PURGE_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
from ambari_server.setupSecurity import setup_ldap, sync_ldap, setup_master_key, setup_ambari_krb5_jaas, setup_pam
from ambari_server.userInput import get_validated_string_input
from ambari_server.kerberos_setup import setup_kerberos
@@ -200,11 +200,11 @@ def restart(args):
@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
-def database_cleanup(args):
- logger.info("Database cleanup.")
+def database_purge(args):
+ logger.info("Purging historical data from database.")
if args.silent:
stop(args)
- db_cleanup(args)
+ db_purge(args)
#
# The Ambari Server status.
@@ -586,9 +586,9 @@ def init_enable_stack_parser_options(parser):
help="Specify stack name for the stack versions that needs to be enabled")
@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
-def init_db_cleanup_parser_options(parser):
+def init_db_purge_parser_options(parser):
parser.add_option('--cluster-name', default=None, help="Cluster name", dest="cluster_name")
- parser.add_option("-d", "--from-date", dest="cleanup_from_date", default=None, type="string", help="Specify date for the cleanup process in 'yyyy-MM-dd' format")
+ parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, type="string", help="Specify date for the database purge process in 'yyyy-MM-dd' format")
@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
def init_install_mpack_parser_options(parser):
@@ -777,7 +777,7 @@ def create_user_action_map(args, options):
CHECK_DATABASE_ACTION: UserAction(check_database, options),
ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
- DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
+ DB_PURGE_ACTION: UserAction(database_purge, options),
INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),
@@ -808,7 +808,7 @@ def init_action_parser(action, parser):
CHECK_DATABASE_ACTION: init_empty_parser_options,
ENABLE_STACK_ACTION: init_enable_stack_parser_options,
SETUP_SSO_ACTION: init_empty_parser_options,
- DB_CLEANUP_ACTION: init_db_cleanup_parser_options,
+ DB_PURGE_ACTION: init_db_purge_parser_options,
INSTALL_MPACK_ACTION: init_install_mpack_parser_options,
UNINSTALL_MPACK_ACTION: init_uninstall_mpack_parser_options,
UPGRADE_MPACK_ACTION: init_upgrade_mpack_parser_options,
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index 6e16bc5..2611141 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -37,7 +37,7 @@ DB_CLEANUP_CMD = "{0} -cp {1} org.apache.ambari.server.cleanup.CleanupDriver --c
#
# Run the db cleanup process
#
-def run_db_cleanup(options):
+def run_db_purge(options):
if validate_args(options):
return 1
@@ -50,18 +50,18 @@ def run_db_cleanup(options):
confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
db_title), True)
if not confirmBackup:
- print_info_msg("Ambari Server Database cleanup aborted")
+ print_info_msg("Ambari Server Database purge aborted")
return 0
if status:
- print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
+ print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
return 1
confirm = get_YN_input(
- "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
- db_title, options.cleanup_from_date), True)
+ "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
+ db_title, options.purge_from_date), True)
if not confirm:
- print_info_msg("Ambari Server Database cleanup aborted")
+ print_info_msg("Ambari Server Database purge aborted")
return 0
@@ -81,31 +81,31 @@ def run_db_cleanup(options):
current_user = ensure_can_start_under_current_user(ambari_user)
environ = generate_env(options, ambari_user, current_user)
- print "Cleaning up the database ..."
- command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.cleanup_from_date)
+ print "Purging historical data from the database ..."
+ command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.purge_from_date)
(retcode, stdout, stderr) = run_os_command(command, env=environ)
print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
if stdout:
- print "Console output from database cleanup command:"
+ print "Console output from database purge-history command:"
print stdout
print
if stderr:
- print "Error output from database cleanup command:"
+ print "Error output from database purge-history command:"
print stderr
print
if retcode > 0:
- print_error_msg("Error wncountered while cleaning up the Ambari Server Database. Check the ambari-server.log for details.")
+ print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
else:
- print "Cleanup completed. Check the ambari-server.log for details."
+ print "Purging historical data completed. Check the ambari-server.log for details."
return retcode
#
-# Database cleanup
+# Database purge
#
-def db_cleanup(options):
- return run_db_cleanup(options)
+def db_purge(options):
+ return run_db_purge(options)
def validate_args(options):
@@ -113,12 +113,12 @@ def validate_args(options):
print_error_msg("Please provide the --cluster-name argument.")
return 1
- if not options.cleanup_from_date:
+ if not options.purge_from_date:
print_error_msg("Please provide the --from-date argument.")
return 1
try:
- datetime.datetime.strptime(options.cleanup_from_date, "%Y-%m-%d")
+ datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
except ValueError as e:
print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
return 1;
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari_server/setupActions.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/setupActions.py b/ambari-server/src/main/python/ambari_server/setupActions.py
index 758e42f..707cb84 100644
--- a/ambari-server/src/main/python/ambari_server/setupActions.py
+++ b/ambari-server/src/main/python/ambari_server/setupActions.py
@@ -42,7 +42,7 @@ BACKUP_ACTION = "backup"
RESTORE_ACTION = "restore"
SETUP_JCE_ACTION = "setup-jce"
ENABLE_STACK_ACTION = "enable-stack"
-DB_CLEANUP_ACTION = "db-cleanup"
+DB_PURGE_ACTION = "db-purge-history"
INSTALL_MPACK_ACTION = "install-mpack"
UNINSTALL_MPACK_ACTION = "uninstall-mpack"
UPGRADE_MPACK_ACTION = "upgrade-mpack"
http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
index 580ac0d..eca6f13 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -71,7 +71,7 @@ public class CleanupServiceImplTest {
cleanupServiceImpl = new CleanupServiceImpl(cleanables);
// WHEN
- long rows = cleanupServiceImpl.cleanup(cleanupPolicy);
+ cleanupServiceImpl.cleanup(cleanupPolicy);
// THEN
Assert.assertNotNull("The argument is null", timeBasedCleanupPolicyCapture.getValue());
@@ -79,4 +79,41 @@ public class CleanupServiceImplTest {
Assert.assertEquals("The to date is wrong!", timeBasedCleanupPolicyCapture.getValue().getToDateInMillis(), FROM_DATE_TIMESTAMP);
}
-}
+ @Test
+ public void testAffectedRowsNoError() throws Exception {
+ // GIVEN
+ cleanables = new HashSet<>();
+ cleanables.add(cleanableDao);
+ expect(cleanableDao.cleanup(cleanupPolicy)).andReturn(2L);
+
+ replay(cleanableDao);
+ cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+ // WHEN
+ CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy);
+
+ // THEN
+ Assert.assertEquals("The affected rows count is wrong", 2L, res.getAffectedRows());
+ Assert.assertEquals("The error count is wrong", 0L, res.getErrorCount());
+ }
+
+ @Test
+ public void testAffectedRowsWithErrors() throws Exception {
+ // GIVEN
+ cleanables = new HashSet<>();
+ cleanables.add(cleanableDao);
+ expect(cleanableDao.cleanup(cleanupPolicy)).andThrow(new RuntimeException());
+
+
+ replay(cleanableDao);
+ cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+ // WHEN
+ CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy);
+
+ // THEN
+ Assert.assertEquals("The affected rows count is wrong", 0L, res.getAffectedRows());
+ Assert.assertEquals("The error count is wrong", 1L, res.getErrorCount());
+ }
+
+}
\ No newline at end of file