You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2018/09/18 15:24:06 UTC

[ambari] branch trunk updated: [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories

This is an automated email from the ASF dual-hosted git repository.

rlevas pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 620539f  [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories
620539f is described below

commit 620539fa490b05ce0b4e93911423fd10e1267053
Author: Robert Levas <rl...@users.noreply.github.com>
AuthorDate: Tue Sep 18 11:24:00 2018 -0400

    [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories
    
    * [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories
    
    * [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories
    
    * [AMBARI-24616] Disable Kerberos from Ambari UI didn't clean up keytab directories
---
 .../server/controller/DeleteIdentityHandler.java   | 106 +++++--
 .../server/controller/KerberosHelperImpl.java      |  48 +--
 .../utilities/KerberosIdentityCleaner.java         |  43 +--
 .../controller/utilities/RemovableIdentities.java  |  15 +-
 .../events/publishers/AgentCommandsPublisher.java  | 337 +++++++++++++++++----
 .../ambari/server/orm/dao/KerberosKeytabDAO.java   |  43 ++-
 .../server/orm/dao/KerberosKeytabPrincipalDAO.java |  63 +++-
 .../server/orm/dao/KerberosPrincipalDAO.java       |  46 ++-
 .../server/orm/entities/KerberosKeytabEntity.java  |   2 +-
 .../entities/KerberosKeytabPrincipalEntity.java    |  37 ++-
 .../orm/entities/KerberosPrincipalEntity.java      |  22 ++
 .../serveraction/kerberos/CleanupServerAction.java |  17 +-
 .../ConfigureAmbariIdentitiesServerAction.java     |  15 +-
 .../kerberos/DestroyPrincipalsServerAction.java    | 287 +++++++++++++-----
 .../kerberos/KerberosServerAction.java             |  17 +-
 .../PrepareDisableKerberosServerAction.java        |   5 +
 .../stageutils/KerberosKeytabController.java       |  35 ++-
 .../ambari/server/agent/TestHeartbeatHandler.java  |  37 ++-
 .../server/controller/KerberosHelperTest.java      |  14 +-
 .../utilities/KerberosIdentityCleanerTest.java     |  26 +-
 .../kerberos/KerberosServerActionTest.java         |   3 +-
 21 files changed, 907 insertions(+), 311 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/DeleteIdentityHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/DeleteIdentityHandler.java
index 927e2d5..b9ea37d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/DeleteIdentityHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/DeleteIdentityHandler.java
@@ -18,12 +18,16 @@
 package org.apache.ambari.server.controller;
 
 import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ambari.server.controller.KerberosHelperImpl.BASE_LOG_DIR;
+import static org.apache.ambari.server.controller.KerberosHelperImpl.REMOVE_KEYTAB;
 
 import java.io.File;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +56,7 @@ import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.collections.CollectionUtils;
 
 import com.google.gson.reflect.TypeToken;
 
@@ -60,6 +65,8 @@ import com.google.gson.reflect.TypeToken;
  * I delete kerberos identities (principals and keytabs) of a given component.
  */
 class DeleteIdentityHandler {
+  public static final String COMPONENT_FILTER = "component_filter";
+
   private final AmbariCustomCommandExecutionHelper customCommandExecutionHelper;
   private final Integer taskTimeout;
   private final StageFactory stageFactory;
@@ -83,8 +90,8 @@ class DeleteIdentityHandler {
     String hostParamsJson = StageUtils.getGson().toJson(customCommandExecutionHelper.createDefaultHostParams(cluster, cluster.getDesiredStackVersion()));
     if (manageIdentities) {
       addPrepareDeleteIdentity(cluster, hostParamsJson, event, commandParameters, stageContainer);
-      addDestroyPrincipals(cluster, hostParamsJson, event, commandParameters, stageContainer);
       addDeleteKeytab(cluster, commandParameters.getAffectedHostNames(), hostParamsJson, commandParameters, stageContainer);
+      addDestroyPrincipals(cluster, hostParamsJson, event, commandParameters, stageContainer);
     }
     addFinalize(cluster, hostParamsJson, event, stageContainer, commandParameters);
   }
@@ -134,27 +141,36 @@ class DeleteIdentityHandler {
                                String hostParamsJson,
                                CommandParams commandParameters,
                                OrderedRequestStageContainer stageContainer)
-    throws AmbariException
-  {
-    Stage stage = createNewStage(stageContainer.getLastStageId(),
-      cluster,
-      stageContainer.getId(),
-      "Delete Keytabs",
-      commandParameters.asJson(),
-      hostParamsJson);
+      throws AmbariException {
 
-    Map<String, String> requestParams = new HashMap<>();
-    List<RequestResourceFilter> requestResourceFilters = new ArrayList<>();
-    RequestResourceFilter reqResFilter = new RequestResourceFilter("KERBEROS", "KERBEROS_CLIENT", new ArrayList<>(hostFilter));
-    requestResourceFilters.add(reqResFilter);
+    // Filter out any hosts that have been removed
+    Set<String> hostNames = (CollectionUtils.isEmpty(hostFilter))
+        ? null
+        : hostFilter.stream()
+        .filter(hostname -> ambariManagementController.getClusters().hostExists(hostname))
+        .collect(toSet());
 
-    ActionExecutionContext actionExecContext = new ActionExecutionContext(
-      cluster.getClusterName(),
-      "REMOVE_KEYTAB",
-      requestResourceFilters,
-      requestParams);
-    customCommandExecutionHelper.addExecutionCommandsToStage(actionExecContext, stage, requestParams, null);
-    stageContainer.addStage(stage);
+    if(CollectionUtils.isNotEmpty(hostNames)) {
+      Stage stage = createNewStage(stageContainer.getLastStageId(),
+          cluster,
+          stageContainer.getId(),
+          "Delete Keytabs",
+          commandParameters.asJson(),
+          hostParamsJson);
+
+      Map<String, String> requestParams = new HashMap<>();
+      List<RequestResourceFilter> requestResourceFilters = new ArrayList<>();
+      RequestResourceFilter reqResFilter = new RequestResourceFilter("KERBEROS", "KERBEROS_CLIENT", new ArrayList<>(hostNames));
+      requestResourceFilters.add(reqResFilter);
+
+      ActionExecutionContext actionExecContext = new ActionExecutionContext(
+          cluster.getClusterName(),
+          REMOVE_KEYTAB,
+          requestResourceFilters,
+          requestParams);
+      customCommandExecutionHelper.addExecutionCommandsToStage(actionExecContext, stage, requestParams, null);
+      stageContainer.addStage(stage);
+    }
   }
 
   private void addFinalize(Cluster cluster,
@@ -200,7 +216,9 @@ class DeleteIdentityHandler {
       commandParameters.put(KerberosServerAction.DEFAULT_REALM, defaultRealm);
       commandParameters.put(KerberosServerAction.KDC_TYPE, kdcType.name());
       commandParameters.put(KerberosServerAction.IDENTITY_FILTER, StageUtils.getGson().toJson(identities));
-      commandParameters.put(KerberosServerAction.COMPONENT_FILTER, StageUtils.getGson().toJson(components));
+      commandParameters.put(COMPONENT_FILTER, StageUtils.getGson().toJson(components));
+      commandParameters.put(KerberosServerAction.SERVICE_COMPONENT_FILTER, StageUtils.getGson().toJson(toServiceComponentFilter(components)));
+      commandParameters.put(KerberosServerAction.HOST_FILTER, StageUtils.getGson().toJson(toHostFilter(components)));
       commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
       return commandParameters;
     }
@@ -212,9 +230,37 @@ class DeleteIdentityHandler {
     public String asJson() {
       return StageUtils.getGson().toJson(asMap());
     }
+
+    /**
+     * Convert a collection of {@link Component}s to a service/component filter.
+     *
+     * @param components the collection of relevant {@link Component}s
+     * @return a map of service names to component names to include in an operation
+     */
+    private Map<String, ? extends Collection<String>> toServiceComponentFilter(List<Component> components) {
+      if (components == null) {
+        return null;
+      }
+
+      return components.stream().collect(groupingBy(Component::getServiceName, mapping(Component::getServiceComponentName, toSet())));
+    }
+
+    /**
+     * Convert a collection of {@link Component}s to a host filter.
+     *
+     * @param components the collection of relevant {@link Component}s
+     * @return a set of hostnames of hosts to include in an operation
+     */
+    private Set<String> toHostFilter(List<Component> components) {
+      if (components == null) {
+        return null;
+      }
+
+      return components.stream().map(Component::getHostName).collect(toSet());
+    }
   }
 
-  private static class PrepareDeleteIdentityServerAction extends AbstractPrepareKerberosServerAction {
+  public static class PrepareDeleteIdentityServerAction extends AbstractPrepareKerberosServerAction {
     @Override
     public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) throws AmbariException, InterruptedException {
       KerberosDescriptor kerberosDescriptor = getKerberosDescriptor();
@@ -232,12 +278,12 @@ class DeleteIdentityHandler {
     }
 
     private Set<String> serviceNames() {
-      return componentFilter().stream().map(component -> component.getServiceName()).collect(toSet());
+      return componentFilter().stream().map(Component::getServiceName).collect(toSet());
     }
 
     private List<Component> componentFilter() {
       Type jsonType = new TypeToken<List<Component>>() {}.getType();
-      return StageUtils.getGson().fromJson(getCommandParameterValue(KerberosServerAction.COMPONENT_FILTER), jsonType);
+      return StageUtils.getGson().fromJson(getCommandParameterValue(COMPONENT_FILTER), jsonType);
     }
 
     /**
@@ -281,6 +327,12 @@ class DeleteIdentityHandler {
     private KerberosDescriptor getKerberosDescriptor() throws AmbariException {
       return getKerberosHelper().getKerberosDescriptor(getCluster(), false);
     }
+
+    @Override
+    protected boolean pruneServiceFilter() {
+      // Do not prune off services that have been previously removed.
+      return false;
+    }
   }
 
   private Stage createNewStage(long id, Cluster cluster, long requestId, String requestContext, String commandParams, String hostParams) {
@@ -321,6 +373,12 @@ class DeleteIdentityHandler {
     }
 
     @Override
+    protected boolean pruneServiceFilter() {
+      // Do not prune off services that have been previously removed.
+      return false;
+    }
+
+    @Override
     protected CommandReport processIdentity(ResolvedKerberosPrincipal resolvedPrincipal, KerberosOperationHandler operationHandler, Map<String, String> kerberosConfiguration, boolean includedInFilter, Map<String, Object> requestSharedDataContext) throws AmbariException {
       return null;
     }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
index dfeb61c..fd8f9dc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
@@ -68,6 +68,7 @@ import org.apache.ambari.server.orm.entities.ArtifactEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.KerberosKeytabEntity;
 import org.apache.ambari.server.orm.entities.KerberosKeytabPrincipalEntity;
+import org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
 import org.apache.ambari.server.security.credential.Credential;
 import org.apache.ambari.server.security.credential.PrincipalKeyCredential;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
@@ -407,9 +408,6 @@ public class KerberosHelperImpl implements KerberosHelper {
    */
   @Override
   public void deleteIdentities(Cluster cluster, List<Component> components, Set<String> identities) throws AmbariException, KerberosOperationException {
-    if (identities.isEmpty()) {
-      return;
-    }
     LOG.info("Deleting identities: ", identities);
     KerberosDetails kerberosDetails = getKerberosDetails(cluster, null);
     validateKDCCredentials(kerberosDetails, cluster);
@@ -1976,12 +1974,14 @@ public class KerberosHelperImpl implements KerberosHelper {
         String serviceName = mappingEntry.getKey();
         HostEntity hostEntity = principal.getHostId() != null ? hostDAO.findById(principal.getHostId()) : null;
         KerberosKeytabEntity kke = kerberosKeytabDAO.find(resolvedKerberosKeytab.getFile());
+        KerberosPrincipalEntity kpe = kerberosPrincipalDAO.find(principal.getPrincipal());
 
-        KerberosKeytabPrincipalEntity kkp = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostEntity, kerberosPrincipalDAO.find(principal.getPrincipal()));
-        if(kkp.putServiceMapping(serviceName, mappingEntry.getValue())) {
+        KerberosKeytabPrincipalEntity kkp = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostEntity, kpe);
+        if (kkp.putServiceMapping(serviceName, mappingEntry.getValue())) {
           kerberosKeytabPrincipalDAO.merge(kkp);
         }
         kerberosKeytabDAO.merge(kke);
+        kerberosPrincipalDAO.merge(kpe);
       }
     }
   }
@@ -2382,15 +2382,20 @@ public class KerberosHelperImpl implements KerberosHelper {
                 kke.setGroupAccess(keytabFileGroupAccess);
                 kerberosKeytabDAO.create(kke);
               }
+
               // create principals
-              if (!kerberosPrincipalDAO.exists(principal)) {
-                kerberosPrincipalDAO.create(principal, false);
+              KerberosPrincipalEntity kpe = kerberosPrincipalDAO.find(principal);
+              if (kpe == null) {
+                kpe = new KerberosPrincipalEntity(principal, false, null);
+                kerberosPrincipalDAO.create(kpe);
               }
-              KerberosKeytabPrincipalEntity kkp = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostDAO.findById(sch.getHost().getHostId()), kerberosPrincipalDAO.find(principal));
-              if(kkp.putServiceMapping(sch.getServiceName(), sch.getServiceComponentName())) {
+
+              KerberosKeytabPrincipalEntity kkp = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostDAO.findById(sch.getHost().getHostId()), kpe);
+              if (kkp.putServiceMapping(sch.getServiceName(), sch.getServiceComponentName())) {
                 kerberosKeytabPrincipalDAO.merge(kkp);
               }
               kerberosKeytabDAO.merge(kke);
+              kerberosPrincipalDAO.merge(kpe);
               hostsWithValidKerberosClient.add(hostname);
               serviceComponentHostsToProcess.add(sch);
             }
@@ -3992,14 +3997,16 @@ public class KerberosHelperImpl implements KerberosHelper {
         commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
 
         // *****************************************************************
+        // Create stage to delete keytabs
+        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+            hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+
+        // *****************************************************************
         // Create stage to remove principals
+        // - this should be the last operation that deals with principals and keytab files since the
+        //   relevant database records are expected to be removed.
         addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
           roleCommandOrder, requestStageContainer);
-
-        // *****************************************************************
-        // Create stage to delete keytabs
-        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
-          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
       }
 
       // *****************************************************************
@@ -4331,14 +4338,21 @@ public class KerberosHelperImpl implements KerberosHelper {
           commandParameters, roleCommandOrder, requestStageContainer);
 
         // *****************************************************************
+        // Create stage to delete keytabs
+        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+            hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+
+        // *****************************************************************
         // Create stage to delete principals
+        // - this should be the last operation that deals with principals and keytab files since the
+        //   relevant database records are expected to be removed.
         addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
           commandParameters, roleCommandOrder, requestStageContainer);
 
         // *****************************************************************
-        // Create stage to delete keytabs
-        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
-          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+        // Create stage to perform data cleanup (e.g. kerberos descriptor artifact database leftovers)
+        addCleanupStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+            roleCommandOrder, requestStageContainer);
       }
 
       return requestStageContainer.getLastStageId();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleaner.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleaner.java
index 985a2b3..37e9882 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleaner.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleaner.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.serveraction.kerberos.KerberosMissingAdminCredentialsException;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.SecurityType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,17 +63,20 @@ public class KerberosIdentityCleaner {
   public void componentRemoved(ServiceComponentUninstalledEvent event) throws KerberosMissingAdminCredentialsException {
     try {
       Cluster cluster = clusters.getCluster(event.getClusterId());
-      if (null != cluster.getUpgradeInProgress()) {
-        LOG.info("Skipping removal of identities for {} since there is an upgrade in progress",
-            event.getComponentName());
 
-        return;
-      }
+      if (cluster.getSecurityType() == SecurityType.KERBEROS) {
+        if (null != cluster.getUpgradeInProgress()) {
+          LOG.info("Skipping removal of identities for {} since there is an upgrade in progress",
+              event.getComponentName());
+
+          return;
+        }
 
-      LOG.info("Removing identities after {}", event);
-      RemovableIdentities
-        .ofComponent(clusters.getCluster(event.getClusterId()), event, kerberosHelper)
-        .remove(kerberosHelper);
+        LOG.info("Removing identities after {}", event);
+        RemovableIdentities
+            .ofComponent(clusters.getCluster(event.getClusterId()), event, kerberosHelper)
+            .remove(kerberosHelper);
+      }
     } catch (Exception e) {
       LOG.error("Error while deleting kerberos identity after an event: " + event, e);
     }
@@ -86,17 +90,20 @@ public class KerberosIdentityCleaner {
   public void serviceRemoved(ServiceRemovedEvent event) {
     try {
       Cluster cluster = clusters.getCluster(event.getClusterId());
-      if (null != cluster.getUpgradeInProgress()) {
-        LOG.info("Skipping removal of identities for {} since there is an upgrade in progress",
-            event.getServiceName());
 
-        return;
-      }
+      if (cluster.getSecurityType() == SecurityType.KERBEROS) {
+        if (null != cluster.getUpgradeInProgress()) {
+          LOG.info("Skipping removal of identities for {} since there is an upgrade in progress",
+              event.getServiceName());
+
+          return;
+        }
 
-      LOG.info("Removing identities after {}", event);
-      RemovableIdentities
-        .ofService(clusters.getCluster(event.getClusterId()), event, kerberosHelper)
-        .remove(kerberosHelper);
+        LOG.info("Removing identities after {}", event);
+        RemovableIdentities
+            .ofService(clusters.getCluster(event.getClusterId()), event, kerberosHelper)
+            .remove(kerberosHelper);
+      }
     } catch (Exception e) {
       LOG.error("Error while deleting kerberos identity after an event: " + event, e);
     }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/RemovableIdentities.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/RemovableIdentities.java
index c778781..6cec210 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/RemovableIdentities.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/RemovableIdentities.java
@@ -20,12 +20,10 @@ package org.apache.ambari.server.controller.utilities;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 
 import org.apache.ambari.annotations.Experimental;
 import org.apache.ambari.annotations.ExperimentalFeature;
@@ -133,13 +131,16 @@ public class RemovableIdentities {
   }
 
   /**
-   * Remove all identities which are not used by other services or components
+   * Remove all identities which are related to the specified set of components and not used by
+   * other services or components
    */
   public void remove(KerberosHelper kerberosHelper) throws AmbariException, KerberosOperationException {
-    Set<String> identitiesToRemove = skipUsed().stream().map(KerberosIdentityDescriptor::getPath).collect(toSet());
-    if (!identitiesToRemove.isEmpty()) {
-      kerberosHelper.deleteIdentities(cluster, components, identitiesToRemove);
-    }
+    // TODO: Fix the implementation identifying specific identities in the event we need to  pinpoint
+    // TODO: certain identities.  This is not currently needed here since we are only handing removing
+    // TODO: identities tied to a specific service, component, and/or host. The logic to determine
+    // TODO: whether an identity should be removed is handled elsewhere - unfortunately in different
+    // TODO: places
+    kerberosHelper.deleteIdentities(cluster, components, null);
   }
 
   private List<KerberosIdentityDescriptor> skipUsed() throws AmbariException {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
index 5144a7f..0043f09 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
@@ -26,10 +26,11 @@ import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +53,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -176,84 +178,293 @@ public class AgentCommandsPublisher {
    * @throws AmbariException
    */
   private void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException {
-    String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY);
-    KerberosServerAction.KerberosCommandParameters kerberosCommandParameters = new KerberosServerAction.KerberosCommandParameters(ec);
-    if(dataDir != null) {
-      List<Map<String, String>> kcp = ec.getKerberosCommandParams();
+    KerberosCommandParameterProcessor processor = KerberosCommandParameterProcessor.getInstance(command, clusters, ec, kerberosKeytabController);
+    if (processor != null) {
+      ec.setKerberosCommandParams(processor.process(targetHost));
+    }
+  }
+
+  /**
+   * KerberosCommandParameterProcessor is an abstract class providing common implementations for processing
+   * the Kerberos command parameters.
+   *
+   * The Kerberos command parameters are processed differently depending on the operation
+   * - set, check, or remove keytab files.
+   */
+  private static abstract class KerberosCommandParameterProcessor {
+    protected final Clusters clusters;
+
+    protected final ExecutionCommand executionCommand;
+
+    protected final KerberosKeytabController kerberosKeytabController;
+
+    protected List<Map<String, String>> kcp;
+
+    protected KerberosCommandParameterProcessor(Clusters clusters, ExecutionCommand executionCommand, KerberosKeytabController kerberosKeytabController) {
+      this.clusters = clusters;
+      this.executionCommand = executionCommand;
+      this.kerberosKeytabController = kerberosKeytabController;
+      kcp = executionCommand.getKerberosCommandParams();
+    }
+
+    /**
+     * Factory method to return the appropriate KerberosCommandParameterProcessor instance based
+     * on the command being executed.
+     *
+     * @param command the command being executed
+     * @param clusters the clusters helper class
+     * @param executionCommand the execution command structure
+     * @param kerberosKeytabController the keytab controller helper class
+     * @return
+     */
+    public static KerberosCommandParameterProcessor getInstance(String command, Clusters clusters, ExecutionCommand executionCommand, KerberosKeytabController kerberosKeytabController) {
+      if (SET_KEYTAB.equalsIgnoreCase(command)) {
+        return new SetKeytabCommandParameterProcessor(clusters, executionCommand, kerberosKeytabController);
+      }
+      if (CHECK_KEYTABS.equalsIgnoreCase(command)) {
+        return new CheckKeytabsCommandParameterProcessor(clusters, executionCommand, kerberosKeytabController);
+      }
+
+      if (REMOVE_KEYTAB.equalsIgnoreCase(command)) {
+        return new RemoveKeytabCommandParameterProcessor(clusters, executionCommand, kerberosKeytabController);
+      }
+
+      return null;
+    }
+
+    /**
+     * Performs the default behavior for processing the relevant Kerberos identities and generating the
+     * Kerberos-specific command details to send to the agent.
+     *
+     * @param targetHost the hostname of the target host
+     * @return a map of propoperties to set as the Kerberos command parameters
+     * @throws AmbariException
+     */
+    public List<Map<String, String>> process(String targetHost) throws AmbariException {
+      KerberosServerAction.KerberosCommandParameters kerberosCommandParameters = new KerberosServerAction.KerberosCommandParameters(executionCommand);
 
       try {
-        Map<String, Collection<String>> serviceComponentFilter = kerberosKeytabController.adjustServiceComponentFilter(clusters.getCluster(ec.getClusterName()), kerberosCommandParameters.getServiceComponentFilter());
-        serviceComponentFilter.put("AMBARI", Collections.singletonList("*"));
+        Map<String, ? extends Collection<String>> serviceComponentFilter = getServiceComponentFilter(kerberosCommandParameters.getServiceComponentFilter());
+
         Set<ResolvedKerberosKeytab> keytabsToInject = kerberosKeytabController.getFilteredKeytabs(serviceComponentFilter, kerberosCommandParameters.getHostFilter(), kerberosCommandParameters.getIdentityFilter());
         for (ResolvedKerberosKeytab resolvedKeytab : keytabsToInject) {
-          for(ResolvedKerberosPrincipal resolvedPrincipal: resolvedKeytab.getPrincipals()) {
+          for (ResolvedKerberosPrincipal resolvedPrincipal : resolvedKeytab.getPrincipals()) {
             String hostName = resolvedPrincipal.getHostName();
 
             if (targetHost.equalsIgnoreCase(hostName)) {
+              process(targetHost, resolvedKeytab, resolvedPrincipal, serviceComponentFilter);
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new AmbariException("Could not inject keytabs to enable kerberos");
+      }
+
+      return kcp;
+    }
+
+    /**
+     * Performs the default behavior for processing the details of a particular Kerberos identity to
+     * be added to the Kerberos command parameters.
+     *
+     * Implementations will override this method to perform specified tasks.
+     *
+     * @param hostName               the target hostname
+     * @param resolvedKeytab         the relevant keytab file details
+     * @param resolvedPrincipal      the relevant principal details
+     * @param serviceComponentFilter the filter used to determine if the current Kerberos identity
+     *                               should be processed
+     * @throws IOException
+     */
+    protected void process(String hostName, ResolvedKerberosKeytab resolvedKeytab, ResolvedKerberosPrincipal resolvedPrincipal, Map<String, ? extends Collection<String>> serviceComponentFilter) throws IOException {
+      Map<String, String> keytabMap = new HashMap<>();
+      keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
+      keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, resolvedPrincipal.getPrincipal());
+      keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, resolvedKeytab.getFile());
+      kcp.add(keytabMap);
+    }
+
+    /**
+     * Given a service/component filter, processes it as needed.
+     * <p>
+     * See overridden methods for more details.
+     *
+     * @param serviceComponentFilter a map of service to components indicate the services and
+     *                               components to include in an operation
+     * @return a map of service to components indicate the services and components to include in
+     * the operation
+     * @throws AmbariException
+     */
+    protected Map<String, ? extends Collection<String>> getServiceComponentFilter(Map<String, ? extends Collection<String>> serviceComponentFilter) throws AmbariException {
+      return serviceComponentFilter;
+    }
+  }
+
+  /**
+   * SetKeytabCommandParameterProcessor is an implementation of {@link KerberosCommandParameterProcessor}
+   * that handles the case for setting keytab files.
+   * <p>
+   * Specifically, this implementation add addition the keytab file details and its contents to the
+   * command parameters. It also only performs operations only for services and components that are
+   * known to be installed; therefore, the service/component filter may be altered to enforce this.
+   */
+  private static class SetKeytabCommandParameterProcessor extends KerberosCommandParameterProcessor {
+
+    private final String dataDir;
+
+    private SetKeytabCommandParameterProcessor(Clusters clusters, ExecutionCommand executionCommand, KerberosKeytabController kerberosKeytabController) {
+      super(clusters, executionCommand, kerberosKeytabController);
+      dataDir = executionCommand.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY);
+    }
+
+    @Override
+    protected void process(String hostName, ResolvedKerberosKeytab resolvedKeytab, ResolvedKerberosPrincipal resolvedPrincipal, Map<String, ? extends Collection<String>> serviceComponentFilter) throws IOException {
+      if (dataDir != null) {
+        String principal = resolvedPrincipal.getPrincipal();
+        String keytabFilePath = resolvedKeytab.getFile();
+        LOG.info("Processing principal {} for host {} and keytab file path {}", principal, hostName, keytabFilePath);
+
+        if (keytabFilePath != null) {
+          String sha1Keytab = DigestUtils.sha256Hex(keytabFilePath);
+          File keytabFile = Paths.get(dataDir, hostName, sha1Keytab).toFile();
+
+          if (keytabFile.canRead()) {
+            Map<String, String> keytabMap = new HashMap<>();
+
+            keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
+            keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal);
+            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
+            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, resolvedKeytab.getOwnerName());
+            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, resolvedKeytab.getOwnerAccess());
+            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, resolvedKeytab.getGroupName());
+            keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, resolvedKeytab.getGroupAccess());
 
-              if (SET_KEYTAB.equalsIgnoreCase(command)) {
-                String principal = resolvedPrincipal.getPrincipal();
-                String keytabFilePath = resolvedKeytab.getFile();
-                LOG.info("Processing principal {} for host {} and keytab file path {}", principal, hostName, keytabFilePath);
-
-                if (keytabFilePath != null) {
-
-                  String sha1Keytab = DigestUtils.sha256Hex(keytabFilePath);
-                  File keytabFile = new File(dataDir + File.separator + hostName + File.separator + sha1Keytab);
-
-                  if (keytabFile.canRead()) {
-                    Map<String, String> keytabMap = new HashMap<>();
-
-                    keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
-                    keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal);
-                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
-                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, resolvedKeytab.getOwnerName());
-                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, resolvedKeytab.getOwnerAccess());
-                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, resolvedKeytab.getGroupName());
-                    keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, resolvedKeytab.getGroupAccess());
-
-                    BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile));
-                    byte[] keytabContent;
-                    try {
-                      keytabContent = IOUtils.toByteArray(bufferedIn);
-                    } finally {
-                      bufferedIn.close();
-                    }
-                    String keytabContentBase64 = Base64.encodeBase64String(keytabContent);
-                    keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64);
-
-                    kcp.add(keytabMap);
+            BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile));
+            byte[] keytabContent;
+            try {
+              keytabContent = IOUtils.toByteArray(bufferedIn);
+            } finally {
+              bufferedIn.close();
+            }
+            String keytabContentBase64 = Base64.encodeBase64String(keytabContent);
+            keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64);
+
+            kcp.add(keytabMap);
+          } else {
+            LOG.warn("Keytab file for principal {} and host {} can not to be read at path {}",
+                principal, hostName, keytabFile.getAbsolutePath());
+          }
+        }
+      }
+    }
+
+    @Override
+    protected Map<String, ? extends Collection<String>> getServiceComponentFilter(Map<String, ? extends Collection<String>> serviceComponentFilter)
+        throws AmbariException {
+      return kerberosKeytabController.adjustServiceComponentFilter(clusters.getCluster(executionCommand.getClusterName()), false, serviceComponentFilter);
+    }
+  }
+
+  /**
+   * CheckKeytabsCommandParameterProcessor is an implementation of {@link KerberosCommandParameterProcessor}
+   * that handles the case for checking the keytab files on the hosts of the cluster.
+   */
+  private static class CheckKeytabsCommandParameterProcessor extends KerberosCommandParameterProcessor {
+
+    private CheckKeytabsCommandParameterProcessor(Clusters clusters, ExecutionCommand executionCommand, KerberosKeytabController kerberosKeytabController) {
+      super(clusters, executionCommand, kerberosKeytabController);
+    }
+  }
+
+  /**
+   * RemoveKeytabCommandParameterProcessor is an implementation of {@link KerberosCommandParameterProcessor}
+   * that handles the case for setting keytab files.
+   * <p>
+   * Specifically, performs operations any services and components; however only keytab files found
+   * to no longer be needed are specified for removal.
+   */
+  private static class RemoveKeytabCommandParameterProcessor extends KerberosCommandParameterProcessor {
+
+    private RemoveKeytabCommandParameterProcessor(Clusters clusters, ExecutionCommand executionCommand, KerberosKeytabController kerberosKeytabController) {
+      super(clusters, executionCommand, kerberosKeytabController);
+    }
+
+    @Override
+    protected void process(String hostName, ResolvedKerberosKeytab resolvedKeytab, ResolvedKerberosPrincipal resolvedPrincipal, Map<String, ? extends Collection<String>> serviceComponentFilter) throws IOException {
+      if (shouldRemove(hostName, resolvedKeytab, resolvedPrincipal, serviceComponentFilter)) {
+        super.process(hostName, resolvedKeytab, resolvedPrincipal, serviceComponentFilter);
+      }
+    }
+
+    /**
+     * Determines if the keytab file for a given Kerberos identitiy should be removed from the target
+     * host.
+     * <p>
+     * This is determined by comparing the service/component filter with the metadata about the relavent
+     * Kerberos identity. If it is determined that more components than the ones specified in the filer
+     * are linked to the identity, than the keytab file will not be flagged for removal.
+     *
+     * @param hostname               the target hostname
+     * @param resolvedKerberosKeytab the relevant keytab file details
+     * @param resolvedPrincipal      the relevant principal details
+     * @param serviceComponentFilter the filter used to determine if the current Kerberos identity
+     *                               should be processed
+     * @return <code>true</code>, if this keytab file should be removed; <code>false</code>, otherwise
+     */
+    private boolean shouldRemove(String hostname,
+                                 ResolvedKerberosKeytab resolvedKerberosKeytab,
+                                 ResolvedKerberosPrincipal resolvedPrincipal,
+                                 Map<String, ? extends Collection<String>> serviceComponentFilter) {
+      ResolvedKerberosKeytab existingResolvedKeytab = kerberosKeytabController.getKeytabByFile(resolvedKerberosKeytab.getFile());
+
+      if (existingResolvedKeytab == null) {
+        return true;
+      }
+
+      Set<ResolvedKerberosPrincipal> principals = existingResolvedKeytab.getPrincipals();
+      for (ResolvedKerberosPrincipal principal : principals) {
+        if (hostname.equals(principal.getHostName()) && principal.getPrincipal().equals(resolvedPrincipal.getPrincipal())) {
+          Multimap<String, String> temp = principal.getServiceMapping();
+
+          // Make a local copy so we do not edit the stored copy, since we do not know how it is stored...
+          Map<String, Collection<String>> serviceMapping = (temp == null) ? new HashMap<>() : new HashMap<>(temp.asMap());
+
+          // Prune off the services in the filter, or all if the filter it none.  If there are no
+          // service mappings left, this keytab file can be removed...
+          if (serviceComponentFilter == null) {
+            serviceMapping.clear();
+          } else {
+            for (Map.Entry<String, ? extends Collection<String>> entry : serviceComponentFilter.entrySet()) {
+              String service = entry.getKey();
+              Collection<String> components = entry.getValue();
+
+              if (serviceMapping.containsKey(service)) {
+
+                if (CollectionUtils.isEmpty(components) || CollectionUtils.isEmpty(serviceMapping.get(service))) {
+                  // Remove all entries for the service...
+                  serviceMapping.remove(service);
+                } else {
+                  Collection<String> leftOver = new HashSet<String>(serviceMapping.get(service));
+                  leftOver.removeAll(components);
+
+                  if (CollectionUtils.isEmpty(leftOver)) {
+                    serviceMapping.remove(service);
                   } else {
-                    LOG.warn("Keytab file for principal {} and host {} can not to be read at path {}",
-                        principal, hostName, keytabFile.getAbsolutePath());
+                    serviceMapping.put(service, leftOver);
                   }
                 }
-              } else if (REMOVE_KEYTAB.equalsIgnoreCase(command) || CHECK_KEYTABS.equalsIgnoreCase(command)) {
-                Map<String, String> keytabMap = new HashMap<>();
-                String keytabFilePath = resolvedKeytab.getFile();
-
-                String principal = resolvedPrincipal.getPrincipal();
-                for (Map.Entry<String, String> mappingEntry: resolvedPrincipal.getServiceMapping().entries()) {
-                  String serviceName = mappingEntry.getKey();
-                  String componentName = mappingEntry.getValue();
-                  keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
-                  keytabMap.put(KerberosIdentityDataFileReader.SERVICE, serviceName);
-                  keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, componentName);
-                  keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal);
-                  keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
-
-                }
-
-                kcp.add(keytabMap);
               }
             }
           }
+
+          // There are still service mappings for this keytab files, we cannot remove it.
+          if (serviceMapping.size() > 0) {
+            return false;
+          }
         }
-      } catch (IOException e) {
-        throw new AmbariException("Could not inject keytabs to enable kerberos");
       }
-      ec.setKerberosCommandParams(kcp);
+
+      return true;
     }
   }
-
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabDAO.java
index ca7d23c..60ec754 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.orm.dao;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -26,6 +27,10 @@ import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.KerberosKeytabEntity;
+import org.apache.ambari.server.orm.entities.KerberosKeytabPrincipalEntity;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -34,6 +39,8 @@ import com.google.inject.persist.Transactional;
 
 @Singleton
 public class KerberosKeytabDAO {
+  private final static Logger LOG = LoggerFactory.getLogger(KerberosKeytabDAO.class);
+
   @Inject
   Provider<EntityManager> entityManagerProvider;
 
@@ -56,7 +63,10 @@ public class KerberosKeytabDAO {
 
   @Transactional
   public void remove(KerberosKeytabEntity kerberosKeytabEntity) {
-    entityManagerProvider.get().remove(merge(kerberosKeytabEntity));
+    if (kerberosKeytabEntity != null) {
+      EntityManager entityManager = entityManagerProvider.get();
+      entityManager.remove(entityManager.merge(kerberosKeytabEntity));
+    }
   }
 
   public void remove(String keytabPath) {
@@ -71,7 +81,6 @@ public class KerberosKeytabDAO {
     entityManagerProvider.get().refresh(kerberosKeytabEntity);
   }
 
-
   @RequiresSession
   public KerberosKeytabEntity find(String keytabPath) {
     return entityManagerProvider.get().find(KerberosKeytabEntity.class, keytabPath);
@@ -133,4 +142,34 @@ public class KerberosKeytabDAO {
       }
     }
   }
+
+  /**
+   * Determines if there are any references to the {@link KerberosKeytabEntity} before attemping
+   * to remove it.  If there are any references to it, the entity will be not be removed.
+   *
+   * @param kerberosKeytabEntity the entity
+   * @return <code>true</code>, if the entity was remove; <code>false</code> otherwise
+   */
+  public boolean removeIfNotReferenced(KerberosKeytabEntity kerberosKeytabEntity) {
+    if (kerberosKeytabEntity != null) {
+      if (CollectionUtils.isNotEmpty(kerberosKeytabEntity.getKerberosKeytabPrincipalEntities())) {
+        ArrayList<String> ids = new ArrayList<>();
+        for (KerberosKeytabPrincipalEntity entity : kerberosKeytabEntity.getKerberosKeytabPrincipalEntities()) {
+          Long id = entity.getKkpId();
+
+          if (id != null) {
+            ids.add(String.valueOf(id));
+          }
+        }
+
+        LOG.debug(String.format("keytab entry for %s is still referenced by [%s]", kerberosKeytabEntity.getKeytabPath(), String.join(",", ids)));
+      } else {
+        LOG.debug(String.format("keytab entry for %s is no longer referenced. It will be removed.", kerberosKeytabEntity.getKeytabPath()));
+        remove(kerberosKeytabEntity);
+        return true;
+      }
+    }
+
+    return false;
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabPrincipalDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabPrincipalDAO.java
index bf4b75b..7b1aa45 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabPrincipalDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosKeytabPrincipalDAO.java
@@ -36,6 +36,7 @@ import org.apache.ambari.server.orm.entities.KerberosKeytabEntity;
 import org.apache.ambari.server.orm.entities.KerberosKeytabPrincipalEntity;
 import org.apache.ambari.server.orm.entities.KerberosKeytabServiceMappingEntity;
 import org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
+import org.apache.commons.collections.CollectionUtils;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -69,22 +70,22 @@ public class KerberosKeytabPrincipalDAO {
    * Find or create {@link KerberosKeytabPrincipalEntity} with specified dependecies.
    *
    * @param kerberosKeytabEntity {@link KerberosKeytabEntity} which owns this principal
-   * @param hostEntity  {@link HostEntity} which owns this principal
-   * @param principalEntity {@link KerberosPrincipalEntity} which related to this principal
+   * @param hostEntity           {@link HostEntity} which owns this principal
+   * @param kerberosPrincipalEntity      {@link KerberosPrincipalEntity} which related to this principal
    * @return evaluated entity
    */
-  public KerberosKeytabPrincipalEntity findOrCreate(KerberosKeytabEntity kerberosKeytabEntity, HostEntity hostEntity, KerberosPrincipalEntity principalEntity)
-  {
+  public KerberosKeytabPrincipalEntity findOrCreate(KerberosKeytabEntity kerberosKeytabEntity, HostEntity hostEntity, KerberosPrincipalEntity kerberosPrincipalEntity) {
     Long hostId = hostEntity == null ? null : hostEntity.getHostId();
-    KerberosKeytabPrincipalEntity kkp = findByNaturalKey(hostId, kerberosKeytabEntity.getKeytabPath(), principalEntity.getPrincipalName());
+    KerberosKeytabPrincipalEntity kkp = findByNaturalKey(hostId, kerberosKeytabEntity.getKeytabPath(), kerberosPrincipalEntity.getPrincipalName());
     if (kkp == null) {
       kkp = new KerberosKeytabPrincipalEntity(
-        kerberosKeytabEntity,
-        hostEntity,
-        principalEntity
+          kerberosKeytabEntity,
+          hostEntity,
+          kerberosPrincipalEntity
       );
       create(kkp);
       kerberosKeytabEntity.addKerberosKeytabPrincipal(kkp);
+      kerberosPrincipalEntity.addKerberosKeytabPrincipal(kkp);
     }
     return kkp;
   }
@@ -195,25 +196,48 @@ public class KerberosKeytabPrincipalDAO {
     CriteriaQuery<KerberosKeytabPrincipalEntity> cq = cb.createQuery(KerberosKeytabPrincipalEntity.class);
     Root<KerberosKeytabPrincipalEntity> root = cq.from(KerberosKeytabPrincipalEntity.class);
     ArrayList<Predicate> predicates = new ArrayList<>();
-    if (filter.getServiceNames() != null && filter.getServiceNames().size() > 0)
-    {
+
+    if (CollectionUtils.isNotEmpty(filter.getServiceNames())) {
       Join<KerberosKeytabPrincipalEntity, KerberosKeytabServiceMappingEntity> mappingJoin = root.join("serviceMapping");
       predicates.add(mappingJoin.get("serviceName").in(filter.getServiceNames()));
-      if (filter.getComponentNames() != null && filter.getComponentNames().size() > 0) {
+      if (CollectionUtils.isNotEmpty(filter.getComponentNames())) {
         predicates.add(mappingJoin.get("componentName").in(filter.getComponentNames()));
       }
     }
-    if (filter.getHostNames() != null && filter.getHostNames().size() > 0) {
+
+    if (CollectionUtils.isNotEmpty(filter.getHostNames())) {
       List<Long> hostIds = new ArrayList<>();
+      boolean hasNull = false;
+
       for (String hostname : filter.getHostNames()) {
-        hostIds.add(hostDAO.findByName(hostname).getHostId());
+        HostEntity host = hostDAO.findByName(hostname);
+
+        if (host == null) {
+          // host may be null after a delete host operation, if so, add an OR NULL clause
+          hasNull = true;
+        } else {
+          hostIds.add(host.getHostId());
+        }
+      }
+
+      Predicate hostIDPredicate = (hostIds.isEmpty()) ? null : root.get("hostId").in(hostIds);
+      Predicate hostNullIDPredicate = (hasNull) ? root.get("hostId").isNull() : null;
+
+      if (hostIDPredicate != null) {
+        if (hostNullIDPredicate != null) {
+          predicates.add(cb.or(hostIDPredicate, hostNullIDPredicate));
+        } else {
+          predicates.add(hostIDPredicate);
+        }
+      } else if (hostNullIDPredicate != null) {
+        predicates.add(hostNullIDPredicate);
       }
-      predicates.add(root.get("hostId").in(hostIds));
     }
-    if (filter.getPrincipals() != null && filter.getPrincipals().size() > 0) {
+
+    if (CollectionUtils.isNotEmpty(filter.getPrincipals())) {
       predicates.add(root.get("principalName").in(filter.getPrincipals()));
     }
-    cq.where(cb.and(predicates.toArray(new Predicate[predicates.size()])));
+    cq.where(cb.and(predicates.toArray(new Predicate[0])));
 
     TypedQuery<KerberosKeytabPrincipalEntity> query = entityManagerProvider.get().createQuery(cq);
     List<KerberosKeytabPrincipalEntity> result = query.getResultList();
@@ -305,5 +329,12 @@ public class KerberosKeytabPrincipalDAO {
     public void setPrincipals(Collection<String> principals) {
       this.principals = principals;
     }
+
+    public static KerberosKeytabPrincipalFilter createFilter(String serviceName, Collection<String> componentNames, Collection<String> hostNames, Collection<String> principalNames) {
+      return new KerberosKeytabPrincipalFilter(hostNames,
+          (serviceName == null) ? null : Collections.singleton(serviceName),
+          componentNames,
+          principalNames);
+    }
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosPrincipalDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosPrincipalDAO.java
index 5367e9b..8774ca8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosPrincipalDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/KerberosPrincipalDAO.java
@@ -19,13 +19,18 @@
 package org.apache.ambari.server.orm.dao;
 
 
+import java.util.ArrayList;
 import java.util.List;
 
 import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 
 import org.apache.ambari.server.orm.RequiresSession;
+import org.apache.ambari.server.orm.entities.KerberosKeytabPrincipalEntity;
 import org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -39,6 +44,7 @@ import com.google.inject.persist.Transactional;
 @Singleton
 public class KerberosPrincipalDAO {
 
+  private final static Logger LOG = LoggerFactory.getLogger(KerberosPrincipalDAO.class);
   /**
    * JPA entity manager
    */
@@ -85,14 +91,9 @@ public class KerberosPrincipalDAO {
    */
   @Transactional
   public void remove(KerberosPrincipalEntity kerberosPrincipalEntity) {
-    if(kerberosPrincipalEntity != null) {
+    if (kerberosPrincipalEntity != null) {
       EntityManager entityManager = entityManagerProvider.get();
-      String principalName = kerberosPrincipalEntity.getPrincipalName();
-
-      kerberosPrincipalEntity = find(principalName);
-      if (kerberosPrincipalEntity != null) {
-        entityManager.remove(kerberosPrincipalEntity);
-      }
+      entityManager.remove(entityManager.merge(kerberosPrincipalEntity));
     }
   }
 
@@ -117,7 +118,6 @@ public class KerberosPrincipalDAO {
     entityManagerProvider.get().refresh(kerberosPrincipalEntity);
   }
 
-
   /**
    * Find a KerberosPrincipalEntity with the given principal name.
    *
@@ -159,4 +159,34 @@ public class KerberosPrincipalDAO {
       }
     }
   }
+
+  /**
+   * Determines if there are any references to the {@link KerberosPrincipalEntity} before attempting
+   * to remove it.  If there are any references to it, the entity will be not be removed.
+   *
+   * @param kerberosPrincipalEntity the entity
+   * @return <code>true</code>, if the entity was remove; <code>false</code> otherwise
+   */
+  public boolean removeIfNotReferenced(KerberosPrincipalEntity kerberosPrincipalEntity) {
+    if (kerberosPrincipalEntity != null) {
+      if (CollectionUtils.isNotEmpty(kerberosPrincipalEntity.getKerberosKeytabPrincipalEntities())) {
+        ArrayList<String> ids = new ArrayList<>();
+        for (KerberosKeytabPrincipalEntity entity : kerberosPrincipalEntity.getKerberosKeytabPrincipalEntities()) {
+          Long id = entity.getKkpId();
+
+          if (id != null) {
+            ids.add(String.valueOf(id));
+          }
+        }
+
+        LOG.info(String.format("principal entry for %s is still referenced by [%s]", kerberosPrincipalEntity.getPrincipalName(), String.join(",", ids)));
+      } else {
+        LOG.info(String.format("principal entry for %s is no longer referenced. It will be removed.", kerberosPrincipalEntity.getPrincipalName()));
+        remove(kerberosPrincipalEntity);
+        return true;
+      }
+    }
+
+    return false;
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabEntity.java
index 1757b9f..4fb4f96 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabEntity.java
@@ -62,7 +62,7 @@ public class KerberosKeytabEntity {
   @Column(name = "write_ambari_jaas")
   private Integer writeAmbariJaasFile = 0;
 
-  @OneToMany(mappedBy = "kerberosKeytabEntity", cascade = CascadeType.REMOVE, fetch = FetchType.LAZY)
+  @OneToMany(mappedBy = "kerberosKeytabEntity", cascade = CascadeType.REMOVE, fetch = FetchType.EAGER)
   private Collection<KerberosKeytabPrincipalEntity> kerberosKeytabPrincipalEntities = new ArrayList<>();
 
   public KerberosKeytabEntity() {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabPrincipalEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabPrincipalEntity.java
index 9a55587..676b3e6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabPrincipalEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosKeytabPrincipalEntity.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm.entities;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import javax.persistence.CascadeType;
@@ -107,23 +108,19 @@ public class KerberosKeytabPrincipalEntity {
 
   @ManyToOne
   @JoinColumn(name = "principal_name", referencedColumnName = "principal_name", updatable = false, nullable = false, insertable = false)
-  private KerberosPrincipalEntity principalEntity;
+  private KerberosPrincipalEntity kerberosPrincipalEntity;
 
-  @OneToMany(cascade = CascadeType.ALL, mappedBy = "kerberosKeytabPrincipalEntity")
+  @OneToMany(cascade = CascadeType.ALL, mappedBy = "kerberosKeytabPrincipalEntity", orphanRemoval = true)
   private List<KerberosKeytabServiceMappingEntity> serviceMapping = new ArrayList<>();
 
   public KerberosKeytabPrincipalEntity() {
 
   }
 
-  public KerberosKeytabPrincipalEntity(
-    KerberosKeytabEntity kerberosKeytabEntity,
-    HostEntity hostEntity,
-    KerberosPrincipalEntity principalEntity
-  ) {
+  public KerberosKeytabPrincipalEntity(KerberosKeytabEntity kerberosKeytabEntity, HostEntity hostEntity, KerberosPrincipalEntity kerberosPrincipalEntity) {
     setKerberosKeytabEntity(kerberosKeytabEntity);
     setHostEntity(hostEntity);
-    setPrincipalEntity(principalEntity);
+    setKerberosPrincipalEntity(kerberosPrincipalEntity);
   }
 
   public Long getKkpId() {
@@ -164,14 +161,14 @@ public class KerberosKeytabPrincipalEntity {
     }
   }
 
-  public KerberosPrincipalEntity getPrincipalEntity() {
-    return principalEntity;
+  public KerberosPrincipalEntity getKerberosPrincipalEntity() {
+    return kerberosPrincipalEntity;
   }
 
-  public void setPrincipalEntity(KerberosPrincipalEntity principalEntity) {
-    this.principalEntity = principalEntity;
-    if (principalEntity != null) {
-      principalName = principalEntity.getPrincipalName();
+  public void setKerberosPrincipalEntity(KerberosPrincipalEntity kerberosPrincipalEntity) {
+    this.kerberosPrincipalEntity = kerberosPrincipalEntity;
+    if (kerberosPrincipalEntity != null) {
+      principalName = kerberosPrincipalEntity.getPrincipalName();
     }
   }
 
@@ -181,7 +178,7 @@ public class KerberosKeytabPrincipalEntity {
 
 
   public String getPrincipalName() {
-    return principalEntity != null ? principalEntity.getPrincipalName() : null;
+    return kerberosPrincipalEntity != null ? kerberosPrincipalEntity.getPrincipalName() : null;
   }
 
   public Long getHostId() {
@@ -192,6 +189,16 @@ public class KerberosKeytabPrincipalEntity {
     return hostEntity != null ? hostEntity.getHostName() : null;
   }
 
+  public List<KerberosKeytabServiceMappingEntity> getServiceMapping() {
+    return serviceMapping;
+  }
+
+  public void setServiceMapping(List<KerberosKeytabServiceMappingEntity> serviceMapping) {
+    this.serviceMapping = (serviceMapping == null)
+        ? Collections.emptyList()
+        : new ArrayList<>(serviceMapping);
+  }
+
   public boolean putServiceMapping(String service, String component) {
     if (containsMapping(service, component)) {
       return false;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosPrincipalEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosPrincipalEntity.java
index 5f7cc56..b4846d6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosPrincipalEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/KerberosPrincipalEntity.java
@@ -18,11 +18,17 @@
 
 package org.apache.ambari.server.orm.entities;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
+import javax.persistence.FetchType;
 import javax.persistence.Id;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
 import javax.persistence.Table;
 
 /**
@@ -49,6 +55,9 @@ public class KerberosPrincipalEntity {
   @Column(name = "cached_keytab_path", insertable = true, updatable = true, nullable = true)
   private String cachedKeytabPath = null;
 
+  @OneToMany(mappedBy = "kerberosPrincipalEntity", cascade = CascadeType.REMOVE, fetch = FetchType.EAGER)
+  private Collection<KerberosKeytabPrincipalEntity> kerberosKeytabPrincipalEntities = new ArrayList<>();
+
   /**
    * Constructs an empty KerberosPrincipalEntity
    */
@@ -122,4 +131,17 @@ public class KerberosPrincipalEntity {
     this.cachedKeytabPath = cachedKeytabPath;
   }
 
+  public Collection<KerberosKeytabPrincipalEntity> getKerberosKeytabPrincipalEntities() {
+    return kerberosKeytabPrincipalEntities;
+  }
+
+  public void setKerberosKeytabPrincipalEntities(Collection<KerberosKeytabPrincipalEntity> kerberosKeytabPrincipalEntities) {
+    this.kerberosKeytabPrincipalEntities = kerberosKeytabPrincipalEntities;
+  }
+
+  public void addKerberosKeytabPrincipal(KerberosKeytabPrincipalEntity kerberosKeytabPrincipalEntity) {
+    if (!kerberosKeytabPrincipalEntities.contains(kerberosKeytabPrincipalEntity)) {
+      kerberosKeytabPrincipalEntities.add(kerberosKeytabPrincipalEntity);
+    }
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CleanupServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CleanupServerAction.java
index c2f82d9..720cdda 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CleanupServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CleanupServerAction.java
@@ -32,28 +32,24 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.orm.dao.KerberosKeytabDAO;
-import org.apache.ambari.server.orm.dao.KerberosPrincipalDAO;
 import org.apache.ambari.server.serveraction.kerberos.stageutils.ResolvedKerberosPrincipal;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.SecurityType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Inject;
-
 /**
  * Used to perform Kerberos Cleanup Operations as part of the Unkerberization process
  */
 public class CleanupServerAction extends KerberosServerAction {
-  @Inject
-  KerberosKeytabDAO kerberosKeytabDAO;
-
-  @Inject
-  KerberosPrincipalDAO kerberosPrincipalDAO;
 
   private final static Logger LOG = LoggerFactory.getLogger(CleanupServerAction.class);
 
+  @Override
+  protected boolean pruneServiceFilter() {
+    return false;
+  }
+
   /**
    * Processes an identity as necessary.
    * <p/>
@@ -97,7 +93,6 @@ public class CleanupServerAction extends KerberosServerAction {
     }
 
     return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
-
   }
 
   /**
@@ -118,8 +113,6 @@ public class CleanupServerAction extends KerberosServerAction {
 
     try {
       artifactProvider.deleteResources(new RequestImpl(null, null, null, null), predicate);
-      kerberosPrincipalDAO.remove(kerberosPrincipalDAO.findAll());
-      kerberosKeytabDAO.remove(kerberosKeytabDAO.findAll());
       LOG.info("Kerberos descriptor removed successfully.");
       actionLog.writeStdOut("Kerberos descriptor removed successfully.");
     } catch (NoSuchResourceException e) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
index 87aa3e7..eb95b4e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
@@ -196,7 +196,7 @@ public class ConfigureAmbariIdentitiesServerAction extends KerberosServerAction
       }
 
       KerberosKeytabEntity kke = kerberosKeytabDAO.find(destKeytabFilePath);
-      if (!kerberosKeytabDAO.exists(destKeytabFilePath)) {
+      if (kke == null) {
         kke = new KerberosKeytabEntity(destKeytabFilePath);
         kke.setOwnerName(ownerName);
         kke.setOwnerAccess(ownerAccess);
@@ -205,16 +205,25 @@ public class ConfigureAmbariIdentitiesServerAction extends KerberosServerAction
         kerberosKeytabDAO.create(kke);
       }
 
+      KerberosPrincipalEntity kpe = kerberosPrincipalDAO.find(principal.getPrincipal());
+      if(kpe == null) {
+        kpe = new KerberosPrincipalEntity(principal.getPrincipal(), principal.isService(), principal.getCacheFile());
+        kerberosPrincipalDAO.create(kpe);
+      }
+
       for(Map.Entry<String, String> mapping : principal.getServiceMapping().entries()) {
         String serviceName = mapping.getKey();
         String componentName = mapping.getValue();
-        KerberosPrincipalEntity principalEntity = kerberosPrincipalDAO.find(principal.getPrincipal());
-        KerberosKeytabPrincipalEntity entity = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostEntity, principalEntity);
+        KerberosKeytabPrincipalEntity entity = kerberosKeytabPrincipalDAO.findOrCreate(kke, hostEntity, kpe);
         entity.setDistributed(true);
         entity.putServiceMapping(serviceName, componentName);
         kerberosKeytabPrincipalDAO.merge(entity);
+
         kke.addKerberosKeytabPrincipal(entity);
         kerberosKeytabDAO.merge(kke);
+
+        kpe.addKerberosKeytabPrincipal(entity);
+        kerberosPrincipalDAO.merge(kpe);
       }
 
       if (actionLog != null) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
index a57f528..11deac4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
@@ -19,13 +19,17 @@
 package org.apache.ambari.server.serveraction.kerberos;
 
 import java.io.File;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.audit.event.kerberos.DestroyPrincipalKerberosAuditEvent;
 import org.apache.ambari.server.controller.KerberosHelper;
@@ -33,10 +37,13 @@ import org.apache.ambari.server.orm.dao.KerberosKeytabDAO;
 import org.apache.ambari.server.orm.dao.KerberosKeytabPrincipalDAO;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalDAO;
 import org.apache.ambari.server.orm.entities.KerberosKeytabEntity;
+import org.apache.ambari.server.orm.entities.KerberosKeytabPrincipalEntity;
+import org.apache.ambari.server.orm.entities.KerberosKeytabServiceMappingEntity;
 import org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
-import org.apache.ambari.server.serveraction.kerberos.stageutils.ResolvedKerberosKeytab;
+import org.apache.ambari.server.security.credential.PrincipalKeyCredential;
 import org.apache.ambari.server.serveraction.kerberos.stageutils.ResolvedKerberosPrincipal;
-import org.apache.ambari.server.utils.ShellCommandUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +61,17 @@ import com.google.inject.Inject;
 public class DestroyPrincipalsServerAction extends KerberosServerAction {
   private final static Logger LOG = LoggerFactory.getLogger(DestroyPrincipalsServerAction.class);
 
+  /**
+   * The KerberosOperationHandlerFactory to use to obtain KerberosOperationHandler instances
+   * <p/>
+   * This is needed to help with test cases to mock a KerberosOperationHandler
+   */
+  @Inject
+  private KerberosOperationHandlerFactory kerberosOperationHandlerFactory;
+
+  @Inject
+  private KerberosHelper kerberosHelper;
+
   @Inject
   private KerberosPrincipalDAO kerberosPrincipalDAO;
 
@@ -85,9 +103,164 @@ public class DestroyPrincipalsServerAction extends KerberosServerAction {
   @Override
   public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) throws
       AmbariException, InterruptedException {
-    return processIdentities(requestSharedDataContext);
+
+    Map<String, String> commandParameters = getCommandParameters();
+    KDCType kdcType = getKDCType(commandParameters);
+    PrincipalKeyCredential administratorCredential = kerberosHelper.getKDCAdministratorCredentials(getClusterName());
+    String defaultRealm = getDefaultRealm(commandParameters);
+
+    KerberosOperationHandler operationHandler = kerberosOperationHandlerFactory.getKerberosOperationHandler(kdcType);
+    Map<String, String> kerberosConfiguration = getConfiguration("kerberos-env");
+
+    try {
+      operationHandler.open(administratorCredential, defaultRealm, kerberosConfiguration);
+    } catch (KerberosOperationException e) {
+      String message = String.format("Failed to process the identities, could not properly open the KDC operation handler: %s",
+          e.getMessage());
+      actionLog.writeStdErr(message);
+      LOG.error(message);
+      throw new AmbariException(message, e);
+    }
+
+    actionLog.writeStdOut("Cleaning up Kerberos identities.");
+
+    Map<String, ? extends Collection<String>> serviceComponentFilter = getServiceComponentFilter();
+    Set<String> hostFilter = getHostFilter();
+    Collection<String> principalNameFilter = getIdentityFilter();
+
+    List<KerberosKeytabPrincipalEntity> kerberosKeytabPrincipalEntities;
+
+    if (MapUtils.isEmpty(serviceComponentFilter) && CollectionUtils.isEmpty(hostFilter) && CollectionUtils.isEmpty(principalNameFilter)) {
+      // Clean up all... this is probably a disable Kerberos operation
+      kerberosKeytabPrincipalEntities = kerberosKeytabPrincipalDAO.findAll();
+    } else {
+      // Build the search filters
+      ArrayList<KerberosKeytabPrincipalDAO.KerberosKeytabPrincipalFilter> filters = new ArrayList<>();
+
+      if (MapUtils.isEmpty(serviceComponentFilter)) {
+        filters.add(KerberosKeytabPrincipalDAO.KerberosKeytabPrincipalFilter.createFilter(
+            null,
+            null,
+            hostFilter,
+            principalNameFilter));
+      } else {
+        for (Map.Entry<String, ? extends Collection<String>> entry : serviceComponentFilter.entrySet()) {
+          filters.add(KerberosKeytabPrincipalDAO.KerberosKeytabPrincipalFilter.createFilter(
+              entry.getKey(),
+              entry.getValue(),
+              hostFilter,
+              principalNameFilter));
+        }
+      }
+
+      // Get only the entries we care about...
+      kerberosKeytabPrincipalEntities = kerberosKeytabPrincipalDAO.findByFilters(filters);
+    }
+
+    if (kerberosKeytabPrincipalEntities != null) {
+      try {
+        Set<Long> visitedKKPID = new HashSet<>();
+
+        for (KerberosKeytabPrincipalEntity kerberosKeytabPrincipalEntity : kerberosKeytabPrincipalEntities) {
+          // Do not re-process duplicate entries
+          if (!visitedKKPID.contains(kerberosKeytabPrincipalEntity.getKkpId())) {
+
+            visitedKKPID.add(kerberosKeytabPrincipalEntity.getKkpId());
+
+            KerberosKeytabEntity kerberosKeytabEntity = kerberosKeytabPrincipalEntity.getKerberosKeytabEntity();
+            KerberosPrincipalEntity kerberosPrincipalEntity = kerberosKeytabPrincipalEntity.getKerberosPrincipalEntity();
+
+            if (serviceComponentFilter == null) {
+              // All service and components "match" in this case... thus all mapping records are to be
+              // removed.  The KerberosKeytabServiceMappingEntity has already been selected to be removed
+              // based on the host and identity filters.
+              kerberosKeytabPrincipalEntity.setServiceMapping(null);
+            } else {
+              // It is possible that this KerberosKeytabPrincipalEntity needs to stick around since other
+              // services and components rely on it.  So remove only the relevant service mapping records
+              List<KerberosKeytabServiceMappingEntity> serviceMapping = kerberosKeytabPrincipalEntity.getServiceMapping();
+
+              if (CollectionUtils.isNotEmpty(serviceMapping)) {
+                // Prune off the relevant service mappings...
+                Iterator<KerberosKeytabServiceMappingEntity> iterator = serviceMapping.iterator();
+                while (iterator.hasNext()) {
+                  KerberosKeytabServiceMappingEntity entity = iterator.next();
+
+                  if (serviceComponentFilter.containsKey(entity.getServiceName())) {
+                    Collection<String> components = serviceComponentFilter.get(entity.getServiceName());
+
+                    if ((CollectionUtils.isEmpty(components)) || components.contains(entity.getComponentName())) {
+                      iterator.remove();
+                    }
+                  }
+                }
+
+                kerberosKeytabPrincipalEntity.setServiceMapping(serviceMapping);
+              }
+            }
+
+            // Apply changes indicated above...
+            kerberosKeytabPrincipalEntity = kerberosKeytabPrincipalDAO.merge(kerberosKeytabPrincipalEntity);
+
+            // If there are no services or components relying on this KerberosKeytabPrincipalEntity, it
+            // should be removed...
+            if (CollectionUtils.isEmpty(kerberosKeytabPrincipalEntity.getServiceMapping())) {
+              kerberosKeytabPrincipalDAO.remove(kerberosKeytabPrincipalEntity);
+
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Cleaning up keytab/principal entry: {}:{}:{}:{}",
+                    kerberosKeytabPrincipalEntity.getKkpId(), kerberosKeytabEntity.getKeytabPath(), kerberosPrincipalEntity.getPrincipalName(), kerberosKeytabPrincipalEntity.getHostName());
+              } else {
+                LOG.info("Cleaning up keytab/principal entry: {}:{}:{}",
+                    kerberosKeytabEntity.getKeytabPath(), kerberosPrincipalEntity.getPrincipalName(), kerberosKeytabPrincipalEntity.getHostName());
+              }
+
+              // Remove the KerberosKeytabPrincipalEntity reference from the relevant KerberosKeytabEntity
+              kerberosKeytabEntity.getKerberosKeytabPrincipalEntities().remove(kerberosKeytabPrincipalEntity);
+              kerberosKeytabEntity = kerberosKeytabDAO.merge(kerberosKeytabEntity);
+
+              // Remove the KerberosKeytabPrincipalEntity reference from the relevant KerberosPrincipalEntity
+              kerberosPrincipalEntity.getKerberosKeytabPrincipalEntities().remove(kerberosKeytabPrincipalEntity);
+              kerberosPrincipalEntity = kerberosPrincipalDAO.merge(kerberosPrincipalEntity);
+            }
+
+            // If there are no more KerberosKeytabPrincipalEntity items that reference this, the keytab
+            // file is no longer needed.
+            if (kerberosKeytabDAO.removeIfNotReferenced(kerberosKeytabEntity)) {
+              String message = String.format("Cleaning up keytab entry: %s", kerberosKeytabEntity.getKeytabPath());
+              LOG.info(message);
+              actionLog.writeStdOut(message);
+            }
+
+            // If there are no more KerberosKeytabPrincipalEntity items that reference this, the principal
+            // is no longer needed.
+            if (kerberosPrincipalDAO.removeIfNotReferenced(kerberosPrincipalEntity)) {
+              String message = String.format("Cleaning up principal entry: %s", kerberosPrincipalEntity.getPrincipalName());
+              LOG.info(message);
+              actionLog.writeStdOut(message);
+
+              destroyIdentity(operationHandler, kerberosPrincipalEntity);
+            }
+          }
+        }
+      } finally {
+        // The KerberosOperationHandler needs to be closed, if it fails to close ignore the
+        // exception since there is little we can or care to do about it now.
+        try {
+          operationHandler.close();
+        } catch (KerberosOperationException e) {
+          // Ignore this...
+        }
+      }
+    }
+
+    return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
   }
 
+  @Override
+  protected boolean pruneServiceFilter() {
+    return false;
+  }
 
   /**
    * For each identity, remove the principal from the configured KDC.
@@ -111,85 +284,51 @@ public class DestroyPrincipalsServerAction extends KerberosServerAction {
                                           boolean includedInFilter,
                                           Map<String, Object> requestSharedDataContext)
       throws AmbariException {
+    throw new UnsupportedOperationException();
+  }
 
-    if(!includedInFilter) {
-      // If this principal is to be filtered out, skip it
-      return null;
-    }
-
-    // Only process this principal if we haven't already processed it
-    if (!seenPrincipals.contains(resolvedPrincipal.getPrincipal())) {
-      seenPrincipals.add(resolvedPrincipal.getPrincipal());
+  private void destroyIdentity(KerberosOperationHandler operationHandler, KerberosPrincipalEntity kerberosPrincipalEntity) {
+    String principalName = kerberosPrincipalEntity.getPrincipalName();
+    String message = String.format("Destroying identity, %s", principalName);
+    LOG.info(message);
+    actionLog.writeStdOut(message);
+    DestroyPrincipalKerberosAuditEvent.DestroyPrincipalKerberosAuditEventBuilder auditEventBuilder = DestroyPrincipalKerberosAuditEvent.builder()
+        .withTimestamp(System.currentTimeMillis())
+        .withRequestId(getHostRoleCommand().getRequestId())
+        .withTaskId(getHostRoleCommand().getTaskId())
+        .withPrincipal(principalName);
 
-      String message = String.format("Destroying identity, %s", resolvedPrincipal.getPrincipal());
-      LOG.info(message);
-      actionLog.writeStdOut(message);
-      DestroyPrincipalKerberosAuditEvent.DestroyPrincipalKerberosAuditEventBuilder auditEventBuilder = DestroyPrincipalKerberosAuditEvent.builder()
-          .withTimestamp(System.currentTimeMillis())
-          .withRequestId(getHostRoleCommand().getRequestId())
-          .withTaskId(getHostRoleCommand().getTaskId())
-          .withPrincipal(resolvedPrincipal.getPrincipal());
+    try {
+      try {
+        operationHandler.removePrincipal(principalName, kerberosPrincipalEntity.isService());
+      } catch (KerberosOperationException e) {
+        message = String.format("Failed to remove identity for %s from the KDC - %s", principalName, e.getMessage());
+        LOG.warn(message, e);
+        actionLog.writeStdErr(message);
+        auditEventBuilder.withReasonOfFailure(message);
+      }
 
       try {
-        try {
-          boolean servicePrincipal = resolvedPrincipal.isService();
-          operationHandler.removePrincipal(resolvedPrincipal.getPrincipal(), servicePrincipal);
-        } catch (KerberosOperationException e) {
-          message = String.format("Failed to remove identity for %s from the KDC - %s", resolvedPrincipal.getPrincipal(), e.getMessage());
-          LOG.warn(message);
-          actionLog.writeStdErr(message);
-          auditEventBuilder.withReasonOfFailure(message);
-        }
+        KerberosPrincipalEntity principalEntity = kerberosPrincipalDAO.find(principalName);
 
-        try {
-          KerberosPrincipalEntity principalEntity = kerberosPrincipalDAO.find(resolvedPrincipal.getPrincipal());
-
-          if (principalEntity != null) {
-            String cachedKeytabPath = principalEntity.getCachedKeytabPath();
-            KerberosKeytabEntity kke = kerberosKeytabDAO.find(resolvedPrincipal.getResolvedKerberosKeytab().getFile());
-            kerberosKeytabPrincipalDAO.remove(kerberosKeytabPrincipalDAO.findByPrincipal(principalEntity.getPrincipalName()));
-            kerberosKeytabDAO.remove(kke);
-            kerberosPrincipalDAO.remove(principalEntity);
-
-            // If a cached  keytabs file exists for this principal, delete it.
-            if (cachedKeytabPath != null) {
-              if (!new File(cachedKeytabPath).delete()) {
-                LOG.debug("Failed to remove cached keytab for {}", resolvedPrincipal.getPrincipal());
-              }
-            }
-          }
+        if (principalEntity != null) {
+          String cachedKeytabPath = principalEntity.getCachedKeytabPath();
 
-          // delete Ambari server keytab
-          String hostName = resolvedPrincipal.getHostName();
-          if (hostName != null && hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
-            ResolvedKerberosKeytab resolvedKeytab = resolvedPrincipal.getResolvedKerberosKeytab();
-            if (resolvedKeytab != null) {
-              String keytabFilePath = resolvedKeytab.getFile();
-              if (keytabFilePath != null) {
-                try {
-                  ShellCommandUtil.Result result = ShellCommandUtil.delete(keytabFilePath, true, true);
-                  if (!result.isSuccessful()) {
-                    LOG.warn("Failed to remove ambari keytab for {} due to {}", resolvedPrincipal.getPrincipal(), result.getStderr());
-                  }
-                } catch (IOException|InterruptedException e) {
-                  LOG.warn("Failed to remove ambari keytab for " + resolvedPrincipal.getPrincipal(), e);
-                }
-              }
+          // If a cached  keytabs file exists for this principal, delete it.
+          if (cachedKeytabPath != null) {
+            if (!new File(cachedKeytabPath).delete()) {
+              LOG.debug("Failed to remove cached keytab for {}", principalName);
             }
           }
-        } catch (Throwable t) {
-          message = String.format("Failed to remove identity for %s from the Ambari database - %s", resolvedPrincipal.getPrincipal(), t.getMessage());
-          LOG.warn(message);
-          actionLog.writeStdErr(message);
-          auditEventBuilder.withReasonOfFailure(message);
         }
-      } finally {
-        auditLog(auditEventBuilder.build());
+      } catch (Throwable t) {
+        message = String.format("Failed to remove identity for %s from the Ambari database - %s", principalName, t.getMessage());
+        LOG.warn(message, t);
+        actionLog.writeStdErr(message);
+        auditEventBuilder.withReasonOfFailure(message);
       }
+    } finally {
+      auditLog(auditEventBuilder.build());
     }
-
-    // There is no reason to fail this task if an identity was not removed. The cluster will work
-    // just fine if this cleanup process fails.
-    return null;
   }
-}
+}
\ No newline at end of file
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
index d9ab7a9..1748648 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
@@ -94,8 +94,6 @@ public abstract class KerberosServerAction extends AbstractServerAction {
    */
   public static final String IDENTITY_FILTER = "identity_filter";
 
-  public static final String COMPONENT_FILTER = "component_filter";
-
   /**
    * A (command parameter) property name used to hold the relevant KDC type value.  See
    * {@link org.apache.ambari.server.serveraction.kerberos.KDCType} for valid values
@@ -186,10 +184,10 @@ public abstract class KerberosServerAction extends AbstractServerAction {
   private KerberosHelper kerberosHelper;
 
   @Inject
-  HostDAO hostDAO;
+  private HostDAO hostDAO;
 
   @Inject
-  KerberosKeytabController kerberosKeytabController;
+  private KerberosKeytabController kerberosKeytabController;
 
   /**
    * Given a (command parameter) Map and a property name, attempts to safely retrieve the requested
@@ -451,7 +449,9 @@ public abstract class KerberosServerAction extends AbstractServerAction {
       }
 
       try {
-        final Map<String, Collection<String>> serviceComponentFilter =kerberosKeytabController.adjustServiceComponentFilter(clusters.getCluster(getClusterName()), getServiceComponentFilter());
+        final Map<String, ? extends Collection<String>> serviceComponentFilter = (pruneServiceFilter())
+            ? kerberosKeytabController.adjustServiceComponentFilter(clusters.getCluster(getClusterName()), true, getServiceComponentFilter())
+            : getServiceComponentFilter();
         final Collection<KerberosIdentityDescriptor> serviceIdentities = serviceComponentFilter == null ? null : calculateServiceIdentities(getClusterName(), serviceComponentFilter);
         for (ResolvedKerberosKeytab rkk : kerberosKeytabController.getFilteredKeytabs(serviceComponentFilter, getHostFilter(), getIdentityFilter())) {
           for (ResolvedKerberosPrincipal principal : rkk.getPrincipals()) {
@@ -485,6 +485,10 @@ public abstract class KerberosServerAction extends AbstractServerAction {
         : commandReport;
   }
 
+  protected boolean pruneServiceFilter() {
+    return true;
+  }
+
   private boolean isRelevantIdentity(Collection<KerberosIdentityDescriptor> serviceIdentities, ResolvedKerberosPrincipal principal) {
     if (serviceIdentities != null) {
       boolean hasValidIdentity = false;
@@ -500,7 +504,7 @@ public abstract class KerberosServerAction extends AbstractServerAction {
     return true;
   }
 
-  private Collection<KerberosIdentityDescriptor> calculateServiceIdentities(String clusterName, Map<String, Collection<String>> serviceComponentFilter)
+  private Collection<KerberosIdentityDescriptor> calculateServiceIdentities(String clusterName, Map<String, ? extends Collection<String>> serviceComponentFilter)
       throws AmbariException {
     final Collection<KerberosIdentityDescriptor> serviceIdentities = new ArrayList<>();
     for (String service : serviceComponentFilter.keySet()) {
@@ -612,7 +616,6 @@ public abstract class KerberosServerAction extends AbstractServerAction {
         : ambariServerHostEntity.getHostId();
   }
 
-
   public static class KerberosCommandParameters {
     private Map<String, String> params;
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java
index 80b901b..1ed9e40 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java
@@ -208,5 +208,10 @@ public class PrepareDisableKerberosServerAction extends AbstractPrepareKerberosS
 
     return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
   }
+
+  @Override
+  protected boolean pruneServiceFilter() {
+    return false;
+  }
 }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/stageutils/KerberosKeytabController.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/stageutils/KerberosKeytabController.java
index 33fd6fa..fd26058 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/stageutils/KerberosKeytabController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/stageutils/KerberosKeytabController.java
@@ -102,7 +102,7 @@ public class KerberosKeytabController {
    * @param identityFilter identity(principal) filter
    * @return set of keytabs found
    */
-  public Set<ResolvedKerberosKeytab> getFilteredKeytabs(Map<String, Collection<String>> serviceComponentFilter,
+  public Set<ResolvedKerberosKeytab> getFilteredKeytabs(Map<String, ? extends Collection<String>> serviceComponentFilter,
                                                         Set<String> hostFilter, Collection<String> identityFilter) {
     if (serviceComponentFilter == null && hostFilter == null && identityFilter == null) {
       return getAllKeytabs();
@@ -133,14 +133,14 @@ public class KerberosKeytabController {
    * @param serviceComponentFilter
    * @return
    */
-  private List<KerberosKeytabPrincipalDAO.KerberosKeytabPrincipalFilter> splitServiceFilter(Map<String, Collection<String>> serviceComponentFilter) {
+  private List<KerberosKeytabPrincipalDAO.KerberosKeytabPrincipalFilter> splitServiceFilter(Map<String, ? extends Collection<String>> serviceComponentFilter) {
     if (serviceComponentFilter != null && serviceComponentFilter.size() > 0) {
       Set<String> serviceSet = new HashSet<>();
       Set<String> componentSet = new HashSet<>();
       Set<String> serviceOnlySet = new HashSet<>();
 
       // Split the filter into a service/component filter or a service-only filter.
-      for(Map.Entry<String, Collection<String>> entry: serviceComponentFilter.entrySet()) {
+      for (Map.Entry<String, ? extends Collection<String>> entry : serviceComponentFilter.entrySet()) {
         String serviceName = entry.getKey();
         Collection<String> serviceComponents = entry.getValue();
 
@@ -209,16 +209,18 @@ public class KerberosKeytabController {
   private Set<ResolvedKerberosPrincipal> fromPrincipalEntities(Collection<KerberosKeytabPrincipalEntity> principalEntities) {
     ImmutableSet.Builder<ResolvedKerberosPrincipal> builder = ImmutableSet.builder();
     for (KerberosKeytabPrincipalEntity kkpe : principalEntities) {
-      KerberosPrincipalEntity kpe = kkpe.getPrincipalEntity();
-      ResolvedKerberosPrincipal rkp = new ResolvedKerberosPrincipal(
-        kkpe.getHostId(),
-        kkpe.getHostName(),
-        kkpe.getPrincipalName(),
-        kpe.isService(),
-        kpe.getCachedKeytabPath(),
-        kkpe.getKeytabPath(),
-        kkpe.getServiceMappingAsMultimap());
-      builder.add(rkp);
+      KerberosPrincipalEntity kpe = kkpe.getKerberosPrincipalEntity();
+      if(kpe != null) {
+        ResolvedKerberosPrincipal rkp = new ResolvedKerberosPrincipal(
+            kkpe.getHostId(),
+            kkpe.getHostName(),
+            kkpe.getPrincipalName(),
+            kpe.isService(),
+            kpe.getCachedKeytabPath(),
+            kkpe.getKeytabPath(),
+            kkpe.getServiceMappingAsMultimap());
+        builder.add(rkp);
+      }
     }
     return builder.build();
   }
@@ -227,14 +229,19 @@ public class KerberosKeytabController {
    * Adjust service component filter according to installed services
    *
    * @param cluster                cluster
+   * @param includeAmbariAsService
    * @param serviceComponentFilter
    * @return
    * @throws AmbariException
    */
-  public Map<String, Collection<String>> adjustServiceComponentFilter(Cluster cluster, Map<String, ? extends Collection<String>> serviceComponentFilter) throws AmbariException {
+  public Map<String, Collection<String>> adjustServiceComponentFilter(Cluster cluster, boolean includeAmbariAsService, Map<String, ? extends Collection<String>> serviceComponentFilter) throws AmbariException {
     Map<String, Collection<String>> adjustedFilter = new HashMap<>();
 
     Map<String, Service> installedServices = (cluster == null) ? null : cluster.getServices();
+    if(includeAmbariAsService) {
+      installedServices = (installedServices == null) ? new HashMap<>() : new HashMap<>(installedServices);
+      installedServices.put("AMBARI", null);
+    }
 
     if (!MapUtils.isEmpty(installedServices)) {
       if (serviceComponentFilter != null) {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index 6997e47..3a0b2c0 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -34,6 +34,7 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_N
 import static org.apache.ambari.server.controller.KerberosHelperImpl.SET_KEYTAB;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
@@ -118,6 +119,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -1497,7 +1500,7 @@ public class TestHeartbeatHandler {
     Method injectKeytabMethod = agentCommandsPublisher.getClass().getDeclaredMethod("injectKeytab",
         ExecutionCommand.class, String.class, String.class);
     injectKeytabMethod.setAccessible(true);
-    commandparams.put(KerberosServerAction.DATA_DIRECTORY, createTestKeytabData(agentCommandsPublisher).getAbsolutePath());
+    commandparams.put(KerberosServerAction.DATA_DIRECTORY, createTestKeytabData(agentCommandsPublisher, false).getAbsolutePath());
     injectKeytabMethod.invoke(agentCommandsPublisher, executionCommand, "SET_KEYTAB", targetHost);
 
     return executionCommand.getKerberosCommandParams();
@@ -1530,18 +1533,40 @@ public class TestHeartbeatHandler {
     Method injectKeytabMethod = agentCommandsPublisher.getClass().getDeclaredMethod("injectKeytab",
         ExecutionCommand.class, String.class, String.class);
     injectKeytabMethod.setAccessible(true);
-    commandparams.put(KerberosServerAction.DATA_DIRECTORY, createTestKeytabData(agentCommandsPublisher).getAbsolutePath());
+    commandparams.put(KerberosServerAction.DATA_DIRECTORY, createTestKeytabData(agentCommandsPublisher, true).getAbsolutePath());
     injectKeytabMethod.invoke(agentCommandsPublisher, executionCommand, "REMOVE_KEYTAB", targetHost);
 
     return executionCommand.getKerberosCommandParams();
   }
 
 
-  private File createTestKeytabData(AgentCommandsPublisher agentCommandsPublisher) throws Exception {
+  private File createTestKeytabData(AgentCommandsPublisher agentCommandsPublisher, boolean removeKeytabs) throws Exception {
     KerberosKeytabController kerberosKeytabControllerMock = createMock(KerberosKeytabController.class);
-    Map<String, Collection<String>> filter = new HashMap<>();
-    filter.put("HDFS", Collections.singletonList("*"));
-    expect(kerberosKeytabControllerMock.adjustServiceComponentFilter(anyObject(), anyObject())).andReturn(filter).once();
+    Map<String, Collection<String>> filter;
+
+    if(removeKeytabs) {
+      filter = null;
+
+      Multimap<String, String> serviceMapping = ArrayListMultimap.create();
+      serviceMapping.put("HDFS", "DATANODE");
+
+      ResolvedKerberosPrincipal resolvedKerberosPrincipal = createMock(ResolvedKerberosPrincipal.class);
+      expect(resolvedKerberosPrincipal.getHostName()).andReturn("c6403.ambari.apache.org");
+      expect(resolvedKerberosPrincipal.getPrincipal()).andReturn("dn/_HOST@_REALM");
+      expect(resolvedKerberosPrincipal.getServiceMapping()).andReturn(serviceMapping);
+      replay(resolvedKerberosPrincipal);
+
+      ResolvedKerberosKeytab resolvedKerberosKeytab = createMock(ResolvedKerberosKeytab.class);
+      expect(resolvedKerberosKeytab.getPrincipals()).andReturn(Collections.singleton(resolvedKerberosPrincipal));
+      replay(resolvedKerberosKeytab);
+
+      expect(kerberosKeytabControllerMock.getKeytabByFile("/etc/security/keytabs/dn.service.keytab")).andReturn(resolvedKerberosKeytab).once();
+    }
+    else {
+      filter = Collections.singletonMap("HDFS", Collections.singletonList("*"));
+    }
+
+    expect(kerberosKeytabControllerMock.adjustServiceComponentFilter(anyObject(), eq(false), anyObject())).andReturn(filter).once();
     expect(kerberosKeytabControllerMock.getFilteredKeytabs(filter,null,null)).andReturn(
       Sets.newHashSet(
         new ResolvedKerberosKeytab(
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
index 9209cae..f7d7136 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
@@ -3392,7 +3392,12 @@ public class KerberosHelperTest extends EasyMockSupport {
     expect(requestStageContainer.getId()).andReturn(1L).once();
     requestStageContainer.addStages(EasyMock.anyObject());
     expectLastCall().once();
-    // Clean-up/Finalize Stage
+    // Clean-up
+    expect(requestStageContainer.getLastStageId()).andReturn(-1L).anyTimes();
+    expect(requestStageContainer.getId()).andReturn(1L).once();
+    requestStageContainer.addStages(EasyMock.anyObject());
+    expectLastCall().once();
+    // Finalize Stage
     expect(requestStageContainer.getLastStageId()).andReturn(-1L).anyTimes();
     expect(requestStageContainer.getId()).andReturn(1L).once();
     requestStageContainer.addStages(EasyMock.anyObject());
@@ -3745,7 +3750,12 @@ public class KerberosHelperTest extends EasyMockSupport {
     expect(requestStageContainer.getId()).andReturn(1L).once();
     requestStageContainer.addStages(EasyMock.anyObject());
     expectLastCall().once();
-    // Clean-up/Finalize Stage
+    // Clean-up
+    expect(requestStageContainer.getLastStageId()).andReturn(-1L).anyTimes();
+    expect(requestStageContainer.getId()).andReturn(1L).once();
+    requestStageContainer.addStages(EasyMock.anyObject());
+    expectLastCall().once();
+    // Finalize Stage
     expect(requestStageContainer.getLastStageId()).andReturn(-1L).anyTimes();
     expect(requestStageContainer.getId()).andReturn(1L).once();
     requestStageContainer.addStages(EasyMock.anyObject());
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleanerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleanerTest.java
index 5783426..512cb9c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleanerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/KerberosIdentityCleanerTest.java
@@ -18,7 +18,6 @@
 package org.apache.ambari.server.controller.utilities;
 
 import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Collections.singletonList;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -78,7 +77,7 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   @Test
   public void removesAllKerberosIdentitesOfComponentAfterComponentWasUninstalled() throws Exception {
     installComponent(OOZIE, OOZIE_SERVER, HOST);
-    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, OOZIE, OOZIE_SERVER, -1l)), newHashSet("/OOZIE/OOZIE_SERVER/oozie_server1", "/OOZIE/OOZIE_SERVER/oozie_server2"));
+    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, OOZIE, OOZIE_SERVER, -1l)), null);
     expectLastCall().once();
     replayAll();
     uninstallComponent(OOZIE, OOZIE_SERVER, HOST);
@@ -86,17 +85,10 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   }
 
   @Test
-  public void skipsRemovingIdentityWhenServiceDoesNotExist() throws Exception {
-    replayAll();
-    uninstallComponent("NO_SUCH_SERVICE", OOZIE_SERVER, HOST);
-    verifyAll();
-  }
-
-  @Test
   public void skipsRemovingIdentityThatIsSharedByPrincipalName() throws Exception {
     installComponent(OOZIE, OOZIE_SERVER, HOST);
     installComponent(OOZIE_2, OOZIE_SERVER_2, HOST);
-    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, OOZIE, OOZIE_SERVER, -1l)), newHashSet("/OOZIE/OOZIE_SERVER/oozie_server1"));
+    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, OOZIE, OOZIE_SERVER, -1l)), null);
     expectLastCall().once();
     replayAll();
     uninstallComponent(OOZIE, OOZIE_SERVER, HOST);
@@ -107,7 +99,7 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   public void skipsRemovingIdentityThatIsSharedByKeyTabFilePath() throws Exception {
     installComponent(YARN, RESOURCE_MANAGER, HOST);
     installComponent(YARN_2, RESOURCE_MANAGER_2, HOST);
-    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, YARN, RESOURCE_MANAGER, -1l)), newHashSet("/YARN/RESOURCE_MANAGER/rm_unique"));
+    kerberosHelper.deleteIdentities(cluster, singletonList(new Component(HOST, YARN, RESOURCE_MANAGER, -1l)), null);
     expectLastCall().once();
     replayAll();
     uninstallComponent(YARN, RESOURCE_MANAGER, HOST);
@@ -118,7 +110,6 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   public void skipsRemovingIdentityWhenClusterIsNotKerberized() throws Exception {
     reset(cluster);
     expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes();
-    expect(cluster.getUpgradeInProgress()).andReturn(null).once();
 
     replayAll();
     uninstallComponent(OOZIE, OOZIE_SERVER, HOST);
@@ -126,17 +117,9 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   }
 
   @Test
-  public void skipsRemovingIdentityIfComponentIsStillInstalledOnADifferentHost() throws Exception {
-    installComponent(OOZIE, OOZIE_SERVER, HOST, HOST2);
-    replayAll();
-    uninstallComponent(OOZIE, OOZIE_SERVER, HOST);
-    verifyAll();
-  }
-
-  @Test
   public void removesServiceIdentitiesSkipComponentIdentitiesAfterServiceWasUninstalled() throws Exception {
     installComponent(OOZIE, OOZIE_SERVER, HOST);
-    kerberosHelper.deleteIdentities(cluster, hdfsComponents(), newHashSet("/HDFS/hdfs-service"));
+    kerberosHelper.deleteIdentities(cluster, hdfsComponents(), null);
     expectLastCall().once();
     replayAll();
     uninstallService(HDFS, hdfsComponents());
@@ -154,6 +137,7 @@ public class KerberosIdentityCleanerTest extends EasyMockSupport {
   public void skipsRemovingIdentityWhenClusterIsUpgrading() throws Exception {
     installComponent(OOZIE, OOZIE_SERVER, HOST);
     reset(cluster);
+    expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).once();
     expect(cluster.getUpgradeInProgress()).andReturn(createNiceMock(UpgradeEntity.class)).once();
 
     replayAll();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerActionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerActionTest.java
index f67336c..246a060 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerActionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerActionTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.serveraction.kerberos;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 
@@ -77,7 +78,7 @@ public class KerberosServerActionTest extends EasyMockSupport {
     ExecutionCommand mockExecutionCommand = createMock(ExecutionCommand.class);
     HostRoleCommand mockHostRoleCommand = createMock(HostRoleCommand.class);
     kerberosKeytabController = createMock(KerberosKeytabController.class);
-    expect(kerberosKeytabController.adjustServiceComponentFilter(anyObject(), anyObject())).andReturn(null).anyTimes();
+    expect(kerberosKeytabController.adjustServiceComponentFilter(anyObject(), eq(true), anyObject())).andReturn(null).anyTimes();
     expect(kerberosKeytabController.getFilteredKeytabs(null, null, null))
       .andReturn(
         Sets.newHashSet(new ResolvedKerberosKeytab(