You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/06/19 20:16:24 UTC

[02/25] ambari git commit: AMBARI-20749. Ambari data purging. (stoader)

AMBARI-20749. Ambari data purging. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/834cb665
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/834cb665
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/834cb665

Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 834cb6655b4b56846f41141c5c55aa70c1eb5cf4
Parents: 7302da4
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Wed Jun 14 16:39:07 2017 +0200
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Thu Jun 15 18:07:47 2017 +0200

----------------------------------------------------------------------
 ambari-server/sbin/ambari-server                |  6 +--
 .../checks/DatabaseConsistencyCheckHelper.java  |  4 +-
 .../ambari/server/cleanup/CleanupDriver.java    | 15 ++++---
 .../ambari/server/cleanup/CleanupService.java   | 19 ++++++++-
 .../server/cleanup/CleanupServiceImpl.java      | 33 +++++++++++++--
 .../server/orm/dao/HostRoleCommandDAO.java      |  7 ++--
 .../ambari/server/orm/dao/RequestDAO.java       | 21 +++++-----
 .../server/orm/dao/TopologyHostTaskDAO.java     |  8 ++--
 .../orm/dao/TopologyLogicalRequestDAO.java      |  8 ++--
 .../server/orm/dao/TopologyLogicalTaskDAO.java  |  8 ++--
 .../orm/entities/TopologyHostTaskEntity.java    |  2 +-
 .../entities/TopologyLogicalRequestEntity.java  |  2 +-
 .../orm/entities/TopologyLogicalTaskEntity.java |  2 +-
 ambari-server/src/main/python/ambari-server.py  | 18 ++++----
 .../src/main/python/ambari_server/dbCleanup.py  | 34 ++++++++--------
 .../main/python/ambari_server/setupActions.py   |  2 +-
 .../server/cleanup/CleanupServiceImplTest.java  | 43 ++++++++++++++++++--
 17 files changed, 162 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/sbin/ambari-server
----------------------------------------------------------------------
diff --git a/ambari-server/sbin/ambari-server b/ambari-server/sbin/ambari-server
index 67e8aea..24ec43a 100755
--- a/ambari-server/sbin/ambari-server
+++ b/ambari-server/sbin/ambari-server
@@ -181,8 +181,8 @@ case "${1:-}" in
         echo -e "Setting up SSO authentication properties..."
         $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
         ;;
-  db-cleanup)
-        echo -e "Cleanup database..."
+  db-purge-history)
+        echo -e "Purge database history..."
         $PYTHON "$AMBARI_PYTHON_EXECUTABLE" $@
         ;;
   install-mpack)
@@ -203,7 +203,7 @@ case "${1:-}" in
         ;;
   *)
         echo "Usage: $AMBARI_EXECUTABLE
-        {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-cleanup|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options]
+        {start|stop|reset|restart|upgrade|status|upgradestack|setup|setup-jce|setup-ldap|sync-ldap|set-current|setup-security|refresh-stack-hash|backup|restore|update-host-names|check-database|enable-stack|setup-sso|db-purge-history|install-mpack|uninstall-mpack|upgrade-mpack|setup-kerberos} [options]
         Use $AMBARI_PYTHON_EXECUTABLE <action> --help to get details on options available.
         Or, simply invoke ambari-server.py --help to print the options."
         exit 1

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index 4f93102..d1566d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -289,7 +289,7 @@ public class DatabaseConsistencyCheckHelper {
 
         if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) {
           warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " +
-                  "that you reduce its size by executing \"ambari-server db-cleanup\".",
+                  "that you reduce its size by executing \"ambari-server db-purge-history\".",
                   tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB);
         } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) {
           LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)",
@@ -309,7 +309,7 @@ public class DatabaseConsistencyCheckHelper {
 
           if (tableRowCount > TABLE_ROW_COUNT_LIMIT) {
             warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " +
-                    "recommended that you reduce its size by executing \"ambari-server db-cleanup\".",
+                    "recommended that you reduce its size by executing \"ambari-server db-purge-history\".",
                     tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT);
           } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) {
             LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT));

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
index 788290b..b4c8369 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupDriver.java
@@ -49,7 +49,7 @@ public class CleanupDriver {
   private static Options getOptions() {
     Options options = new Options();
     options.addOption(Option.builder().longOpt(CLUSTER_NAME_ARG).desc("The cluster name").required().type(String.class).hasArg().valueSeparator(' ').build());
-    options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("The day from which the cleanup runs").required().type(String.class).hasArg().valueSeparator(' ').build());
+    options.addOption(Option.builder().longOpt(FROM_DATE_ARG).desc("Date up until data will be purged.").required().type(String.class).hasArg().valueSeparator(' ').build());
     return options;
   }
 
@@ -67,7 +67,7 @@ public class CleanupDriver {
     } catch (Exception exp) {
       System.err.println("Parsing failed.  Reason: " + exp.getMessage());
       LOGGER.error("Parsing failed.  Reason: ", exp);
-      formatter.printHelp("cleanup", getOptions());
+      formatter.printHelp("db-purge-history", getOptions());
       System.exit(1);
     }
     return ctx;
@@ -75,7 +75,7 @@ public class CleanupDriver {
 
 
   public static void main(String... args) throws Exception {
-    LOGGER.info("DB-CLEANUP - Starting the cleanup process ...");
+    LOGGER.info("DB-PURGE - Starting the database purge process ...");
 
     CleanupContext cleanupContext = processArguments(args);
 
@@ -86,12 +86,17 @@ public class CleanupDriver {
     injector.getInstance(AmbariJpaPersistService.class).start();
 
     CleanupServiceImpl cleanupService = injector.getInstance(CleanupServiceImpl.class);
-    long affected = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp()));
+    CleanupService.CleanupResult result = cleanupService.cleanup(new TimeBasedCleanupPolicy(cleanupContext.getClusterName(), cleanupContext.getFromDayTimestamp()));
 
     // explicitly stopping the persist service
     injector.getInstance(AmbariJpaPersistService.class).stop();
 
-    LOGGER.info("DB-CLEANUP - completed. Number of affected records [{}]", affected);
+    if (result.getErrorCount() > 0) {
+      LOGGER.warn("DB-PURGE - completed with error, check Ambari Server log for details ! Number of affected records [{}]", result.getAffectedRows());
+      System.exit(2);
+    }
+
+    LOGGER.info("DB-PURGE - completed. Number of affected records [{}]", result.getAffectedRows());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
index 880207c..8ac0b92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupService.java
@@ -23,11 +23,28 @@ package org.apache.ambari.server.cleanup;
  */
 public interface CleanupService<T> {
 
+  interface CleanupResult {
+    /**
+     * Returns the number of rows deleted by the cleanup
+     * @return The total number of rows deleted by the cleanup
+     */
+    long getAffectedRows();
+
+    /**
+     * The cleanup process executes the specific cleanup operations via
+     * {@link org.apache.ambari.server.orm.dao.Cleanable} implementations.
+     * Some of these may fail during the cleanup process. This method returns
+     * the number of failed clean ups.
+     * @return The number of failed cleanups.
+     */
+    int getErrorCount();
+  }
+
   /**
    * Triggers the cleanup for the given cleanup policy.
    *
    * @param cleanupPolicy the cleanup policy based on which the cleanup is executed.
    * @return the affected "rows"
    */
-  long cleanup(T cleanupPolicy);
+  CleanupResult cleanup(T cleanupPolicy);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
index 29a9041..0436c92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/cleanup/CleanupServiceImpl.java
@@ -34,6 +34,25 @@ import com.google.inject.Singleton;
 public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy> {
   private static final Logger LOGGER = LoggerFactory.getLogger(CleanupServiceImpl.class);
 
+  class Result implements CleanupResult {
+    private final long affectedRows;
+    private final int errorCount;
+
+    public Result(long affectedRows, int errorCount) {
+      this.affectedRows = affectedRows;
+      this.errorCount = errorCount;
+    }
+
+    @Override
+    public long getAffectedRows() {
+      return affectedRows;
+    }
+
+    @Override
+    public int getErrorCount() {
+      return errorCount;
+    }
+  }
 
   // this Set is automatically populated by the guice framework (based on the cleanup interface)
   private Set<Cleanable> cleanables;
@@ -54,13 +73,21 @@ public class CleanupServiceImpl implements CleanupService<TimeBasedCleanupPolicy
    * @param cleanupPolicy the policy based on which the cleanup is done
    * @return the number of affected rows
    */
-  public long cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
+  public CleanupResult cleanup(TimeBasedCleanupPolicy cleanupPolicy) {
     long affectedRows = 0;
+    int errorCount = 0;
     for (Cleanable cleanable : cleanables) {
       LOGGER.info("Running the purge process for DAO: [{}] with cleanup policy: [{}]", cleanable, cleanupPolicy);
-      affectedRows += cleanable.cleanup(cleanupPolicy);
+      try {
+        affectedRows += cleanable.cleanup(cleanupPolicy);
+      }
+      catch (Exception ex) {
+        LOGGER.error("Running the purge process for DAO: [{}] failed with: {}", cleanable, ex);
+        errorCount++;
+      }
     }
-    return affectedRows;
+
+    return new Result(affectedRows, errorCount);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 58a4180..7ac4b71 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -70,6 +70,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -994,9 +995,9 @@ public class HostRoleCommandDAO {
     }
   }
 
-  public List<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
+  public Set<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
     EntityManager entityManager = entityManagerProvider.get();
-    List<Long> taskIds = new ArrayList<Long>();
+    List<Long> taskIds = new ArrayList<>();
     for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
       TypedQuery<Long> hostRoleCommandQuery =
               entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class);
@@ -1007,6 +1008,6 @@ public class HostRoleCommandDAO {
       taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
     }
 
-    return taskIds;
+    return Sets.newHashSet(taskIds);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
index 8f16cb2..0d6682e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -52,7 +52,6 @@ import org.eclipse.persistence.config.QueryHints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -276,12 +275,12 @@ public class RequestDAO implements Cleanable {
    * Search for all request ids in Upgrade table
    * @return the list of request ids
    */
-  private List<Long> findAllRequestIdsFromUpgrade() {
+  private Set<Long> findAllRequestIdsFromUpgrade() {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> upgradeQuery =
             entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class);
 
-    return daoUtils.selectList(upgradeQuery);
+    return Sets.newHashSet(daoUtils.selectList(upgradeQuery));
   }
 
   /**
@@ -384,7 +383,7 @@ public class RequestDAO implements Cleanable {
 
       // find request ids from Upgrade table and exclude these ids from
       // request ids set that we already have. We don't want to make any changes for upgrade
-      Set<Long> requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade());
+      Set<Long> requestIdsFromUpgrade = findAllRequestIdsFromUpgrade();
       Iterator<StageEntityPK> requestStageIdsIterator =  requestStageIds.iterator();
       while (requestStageIdsIterator.hasNext()) {
         StageEntityPK nextRequestStageIds = requestStageIdsIterator.next();
@@ -400,24 +399,24 @@ public class RequestDAO implements Cleanable {
       }
 
       // find task ids using request stage ids
-      Set<Long> taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds));
+      Set<Long> taskIds = hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds);
       LinkedList<String> params = new LinkedList<>();
       params.add("stageId");
       params.add("requestId");
 
       // find host task ids, to find related host requests and also to remove needed host tasks
-      List<Long> hostTaskIds = new ArrayList<>();
+      Set<Long> hostTaskIds = new HashSet<>();
       if (taskIds != null && !taskIds.isEmpty()) {
-        hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds));
+        hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(taskIds);
       }
 
       // find host request ids by host task ids to remove later needed host requests
-      List<Long> hostRequestIds = new ArrayList<>();
+      Set<Long> hostRequestIds = new HashSet<>();
       if (!hostTaskIds.isEmpty()) {
         hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds);
       }
 
-      List<Long> topologyRequestIds = new ArrayList<>();
+      Set<Long> topologyRequestIds = new HashSet<>();
       if (!hostRequestIds.isEmpty()) {
         topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds);
       }
@@ -428,9 +427,9 @@ public class RequestDAO implements Cleanable {
               "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class);
       affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(),
               "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class);
-      affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
+      affectedRows += cleanTableByIds(hostTaskIds, "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(),
               "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class);
-      affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
+      affectedRows += cleanTableByIds(hostRequestIds, "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(),
               "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class);
       for (Long topologyRequestId : topologyRequestIds) {
         topologyRequestDAO.removeByPK(topologyRequestId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
index 05a4274..70fdc77 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.ambari.server.orm.dao;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
@@ -26,6 +27,7 @@ import javax.persistence.TypedQuery;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -54,14 +56,14 @@ public class TopologyHostTaskDAO {
   }
 
   @RequiresSession
-  public List<Long> findHostRequestIdsByHostTaskIds(List<Long> hostTaskIds) {
+  public Set<Long> findHostRequestIdsByHostTaskIds(Set<Long> hostTaskIds) {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyHostTaskQuery =
             entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class);
 
     topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds);
 
-    return daoUtils.selectList(topologyHostTaskQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
   }
 
   @RequiresSession

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
index 29a15d6..f23cc9f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.util.List;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
@@ -25,6 +26,7 @@ import javax.persistence.TypedQuery;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -64,13 +66,13 @@ public class TopologyLogicalRequestDAO {
   }
 
   @RequiresSession
-  public List<Long> findRequestIdsByIds(List<Long> ids) {
+  public Set<Long> findRequestIdsByIds(Set<Long> ids) {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyLogicalRequestQuery =
             entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class);
 
     topologyLogicalRequestQuery.setParameter("ids", ids);
 
-    return daoUtils.selectList(topologyLogicalRequestQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyLogicalRequestQuery));
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
index 9142473..abde5ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm.dao;
 
 import java.util.List;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
@@ -25,6 +26,7 @@ import javax.persistence.TypedQuery;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -44,14 +46,14 @@ public class TopologyLogicalTaskDAO {
   }
 
   @RequiresSession
-  public List<Long> findHostTaskIdsByPhysicalTaskIds(List<Long> physicalTaskIds) {
+  public Set<Long> findHostTaskIdsByPhysicalTaskIds(Set<Long> physicalTaskIds) {
     EntityManager entityManager = entityManagerProvider.get();
     TypedQuery<Long> topologyHostTaskQuery =
             entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class);
 
     topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds);
 
-    return daoUtils.selectList(topologyHostTaskQuery);
+    return Sets.newHashSet(daoUtils.selectList(topologyHostTaskQuery));
   }
 
   @RequiresSession

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
index 50462e4..5a27ce6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java
@@ -42,7 +42,7 @@ import javax.persistence.TableGenerator;
   @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest",
       query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"),
   @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds",
-      query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
+      query = "SELECT DISTINCT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"),
   @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds",
       query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds")
 })

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
index 9182334..b243f4f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java
@@ -33,7 +33,7 @@ import javax.persistence.Table;
 @Entity
 @Table(name = "topology_logical_request")
 @NamedQueries({
-  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids")
+  @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT DISTINCT t.topologyLogicalRequestEntity.topologyRequestId from TopologyHostRequestEntity t WHERE t.id IN :ids")
 })
 public class TopologyLogicalRequestEntity {
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
index c955074..04971c7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java
@@ -36,7 +36,7 @@ import javax.persistence.TableGenerator;
   pkColumnName = "sequence_name", valueColumnName = "sequence_value",
   pkColumnValue = "topology_logical_task_id_seq", initialValue = 0)
 @NamedQueries({
-  @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
+  @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT DISTINCT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"),
   @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds")
 })
 public class TopologyLogicalTaskEntity {

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari-server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py
index d84e833..c7bdcf9 100755
--- a/ambari-server/src/main/python/ambari-server.py
+++ b/ambari-server/src/main/python/ambari-server.py
@@ -42,7 +42,7 @@ from ambari_server.setupHttps import setup_https, setup_truststore
 from ambari_server.setupMpacks import install_mpack, uninstall_mpack, upgrade_mpack, STACK_DEFINITIONS_RESOURCE_NAME, \
   SERVICE_DEFINITIONS_RESOURCE_NAME, MPACKS_RESOURCE_NAME
 from ambari_server.setupSso import setup_sso
-from ambari_server.dbCleanup import db_cleanup
+from ambari_server.dbCleanup import db_purge
 from ambari_server.hostUpdate import update_host_names
 from ambari_server.checkDatabase import check_database
 from ambari_server.enableStack import enable_stack_version
@@ -52,7 +52,7 @@ from ambari_server.setupActions import BACKUP_ACTION, LDAP_SETUP_ACTION, LDAP_SY
   SETUP_ACTION, SETUP_SECURITY_ACTION,START_ACTION, STATUS_ACTION, STOP_ACTION, RESTART_ACTION, UPGRADE_ACTION, \
   SETUP_JCE_ACTION, SET_CURRENT_ACTION, START_ACTION, STATUS_ACTION, STOP_ACTION, UPGRADE_ACTION, \
   SETUP_JCE_ACTION, SET_CURRENT_ACTION, ENABLE_STACK_ACTION, SETUP_SSO_ACTION, \
-  DB_CLEANUP_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
+  DB_PURGE_ACTION, INSTALL_MPACK_ACTION, UNINSTALL_MPACK_ACTION, UPGRADE_MPACK_ACTION, PAM_SETUP_ACTION, KERBEROS_SETUP_ACTION
 from ambari_server.setupSecurity import setup_ldap, sync_ldap, setup_master_key, setup_ambari_krb5_jaas, setup_pam
 from ambari_server.userInput import get_validated_string_input
 from ambari_server.kerberos_setup import setup_kerberos
@@ -200,11 +200,11 @@ def restart(args):
 
 
 @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
-def database_cleanup(args):
-  logger.info("Database cleanup.")
+def database_purge(args):
+  logger.info("Purging historical data from database.")
   if args.silent:
     stop(args)
-  db_cleanup(args)
+  db_purge(args)
 
 #
 # The Ambari Server status.
@@ -586,9 +586,9 @@ def init_enable_stack_parser_options(parser):
                     help="Specify stack name for the stack versions that needs to be enabled")
 
 @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
-def init_db_cleanup_parser_options(parser):
+def init_db_purge_parser_options(parser):
   parser.add_option('--cluster-name', default=None, help="Cluster name", dest="cluster_name")
-  parser.add_option("-d", "--from-date", dest="cleanup_from_date", default=None, type="string", help="Specify date for the cleanup process in 'yyyy-MM-dd' format")
+  parser.add_option("-d", "--from-date", dest="purge_from_date", default=None, type="string", help="Specify date for the database purge process in 'yyyy-MM-dd' format")
 
 @OsFamilyFuncImpl(OsFamilyImpl.DEFAULT)
 def init_install_mpack_parser_options(parser):
@@ -777,7 +777,7 @@ def create_user_action_map(args, options):
         CHECK_DATABASE_ACTION: UserAction(check_database, options),
         ENABLE_STACK_ACTION: UserAction(enable_stack, options, args),
         SETUP_SSO_ACTION: UserActionRestart(setup_sso, options),
-        DB_CLEANUP_ACTION: UserAction(database_cleanup, options),
+        DB_PURGE_ACTION: UserAction(database_purge, options),
         INSTALL_MPACK_ACTION: UserAction(install_mpack, options),
         UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options),
         UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options),
@@ -808,7 +808,7 @@ def init_action_parser(action, parser):
     CHECK_DATABASE_ACTION: init_empty_parser_options,
     ENABLE_STACK_ACTION: init_enable_stack_parser_options,
     SETUP_SSO_ACTION: init_empty_parser_options,
-    DB_CLEANUP_ACTION: init_db_cleanup_parser_options,
+    DB_PURGE_ACTION: init_db_purge_parser_options,
     INSTALL_MPACK_ACTION: init_install_mpack_parser_options,
     UNINSTALL_MPACK_ACTION: init_uninstall_mpack_parser_options,
     UPGRADE_MPACK_ACTION: init_upgrade_mpack_parser_options,

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari_server/dbCleanup.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py
index 6e16bc5..2611141 100644
--- a/ambari-server/src/main/python/ambari_server/dbCleanup.py
+++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py
@@ -37,7 +37,7 @@ DB_CLEANUP_CMD = "{0} -cp {1} org.apache.ambari.server.cleanup.CleanupDriver --c
 #
 # Run the db cleanup process
 #
-def run_db_cleanup(options):
+def run_db_purge(options):
 
     if validate_args(options):
         return 1
@@ -50,18 +50,18 @@ def run_db_cleanup(options):
       confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format(
               db_title), True)
       if not confirmBackup:
-          print_info_msg("Ambari Server Database cleanup aborted")
+          print_info_msg("Ambari Server Database purge aborted")
           return 0
 
       if status:
-          print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.")
+          print_error_msg("The database purge historical data cannot proceed while Ambari Server is running. Please shut down Ambari first.")
           return 1
 
       confirm = get_YN_input(
-          "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format(
-              db_title, options.cleanup_from_date), True)
+          "Ambari server is using db type {0}. Cleanable database entries older than {1} will be purged. Proceed [y/n]".format(
+              db_title, options.purge_from_date), True)
       if not confirm:
-          print_info_msg("Ambari Server Database cleanup aborted")
+          print_info_msg("Ambari Server Database purge aborted")
           return 0
 
 
@@ -81,31 +81,31 @@ def run_db_cleanup(options):
     current_user = ensure_can_start_under_current_user(ambari_user)
     environ = generate_env(options, ambari_user, current_user)
 
-    print "Cleaning up the database ..."
-    command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.cleanup_from_date)
+    print "Purging historical data from the database ..."
+    command = DB_CLEANUP_CMD.format(jdk_path, class_path, options.cluster_name, options.purge_from_date)
     (retcode, stdout, stderr) = run_os_command(command, env=environ)
 
     print_info_msg("Return code from database cleanup command, retcode = " + str(retcode))
 
     if stdout:
-        print "Console output from database cleanup command:"
+        print "Console output from database purge-history command:"
         print stdout
         print
     if stderr:
-        print "Error output from database cleanup command:"
+        print "Error output from database purge-history command:"
         print stderr
         print
     if retcode > 0:
-        print_error_msg("Error wncountered while cleaning up the Ambari Server Database. Check the ambari-server.log for details.")
+        print_error_msg("Error encountered while purging the Ambari Server Database. Check the ambari-server.log for details.")
     else:
-        print "Cleanup completed. Check the ambari-server.log for details."
+        print "Purging historical data completed. Check the ambari-server.log for details."
     return retcode
 
 #
-# Database cleanup
+# Database purge
 #
-def db_cleanup(options):
-    return run_db_cleanup(options)
+def db_purge(options):
+    return run_db_purge(options)
 
 
 def validate_args(options):
@@ -113,12 +113,12 @@ def validate_args(options):
         print_error_msg("Please provide the --cluster-name argument.")
         return 1
 
-    if not options.cleanup_from_date:
+    if not options.purge_from_date:
         print_error_msg("Please provide the --from-date argument.")
         return 1
 
     try:
-        datetime.datetime.strptime(options.cleanup_from_date, "%Y-%m-%d")
+        datetime.datetime.strptime(options.purge_from_date, "%Y-%m-%d")
     except ValueError as e:
         print_error_msg("The --from-date argument has an invalid format. {0}".format(e.args[0]))
         return 1;

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/main/python/ambari_server/setupActions.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/python/ambari_server/setupActions.py b/ambari-server/src/main/python/ambari_server/setupActions.py
index 758e42f..707cb84 100644
--- a/ambari-server/src/main/python/ambari_server/setupActions.py
+++ b/ambari-server/src/main/python/ambari_server/setupActions.py
@@ -42,7 +42,7 @@ BACKUP_ACTION = "backup"
 RESTORE_ACTION = "restore"
 SETUP_JCE_ACTION = "setup-jce"
 ENABLE_STACK_ACTION = "enable-stack"
-DB_CLEANUP_ACTION = "db-cleanup"
+DB_PURGE_ACTION = "db-purge-history"
 INSTALL_MPACK_ACTION = "install-mpack"
 UNINSTALL_MPACK_ACTION = "uninstall-mpack"
 UPGRADE_MPACK_ACTION = "upgrade-mpack"

http://git-wip-us.apache.org/repos/asf/ambari/blob/834cb665/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
index 580ac0d..eca6f13 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/cleanup/CleanupServiceImplTest.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,7 +71,7 @@ public class CleanupServiceImplTest {
     cleanupServiceImpl = new CleanupServiceImpl(cleanables);
 
     // WHEN
-    long rows = cleanupServiceImpl.cleanup(cleanupPolicy);
+    cleanupServiceImpl.cleanup(cleanupPolicy);
 
     // THEN
     Assert.assertNotNull("The argument is null", timeBasedCleanupPolicyCapture.getValue());
@@ -79,4 +79,41 @@ public class CleanupServiceImplTest {
     Assert.assertEquals("The to date is wrong!", timeBasedCleanupPolicyCapture.getValue().getToDateInMillis(), FROM_DATE_TIMESTAMP);
   }
 
-}
+  @Test
+  public void testAffectedRowsNoError() throws Exception {
+    // GIVEN
+    cleanables = new HashSet<>();
+    cleanables.add(cleanableDao);
+    expect(cleanableDao.cleanup(cleanupPolicy)).andReturn(2L);
+
+    replay(cleanableDao);
+    cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+    // WHEN
+    CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy);
+
+    // THEN
+    Assert.assertEquals("The affected rows count is wrong", 2L, res.getAffectedRows());
+    Assert.assertEquals("The error count is wrong", 0L, res.getErrorCount());
+  }
+
+  @Test
+  public void testAffectedRowsWithErrors() throws Exception {
+    // GIVEN
+    cleanables = new HashSet<>();
+    cleanables.add(cleanableDao);
+    expect(cleanableDao.cleanup(cleanupPolicy)).andThrow(new RuntimeException());
+
+
+    replay(cleanableDao);
+    cleanupServiceImpl = new CleanupServiceImpl(cleanables);
+
+    // WHEN
+    CleanupService.CleanupResult res = cleanupServiceImpl.cleanup(cleanupPolicy);
+
+    // THEN
+    Assert.assertEquals("The affected rows count is wrong", 0L, res.getAffectedRows());
+    Assert.assertEquals("The error count is wrong", 1L, res.getErrorCount());
+  }
+
+}
\ No newline at end of file