You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2015/04/10 18:51:44 UTC

ambari git commit: AMBARI-10305. Kerberos: during disable, need option skip if unable to access KDC to remove principals (rlevas)

Repository: ambari
Updated Branches:
  refs/heads/trunk 64cb538ed -> 3133b1db4


AMBARI-10305. Kerberos: during disable, need option skip if unable to access KDC to remove principals (rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3133b1db
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3133b1db
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3133b1db

Branch: refs/heads/trunk
Commit: 3133b1db4e6d5a01b60f8c73acca98409c1bd44c
Parents: 64cb538
Author: Robert Levas <rl...@hortonworks.com>
Authored: Fri Apr 10 12:51:38 2015 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Fri Apr 10 12:52:05 2015 -0400

----------------------------------------------------------------------
 .../resources/ClusterResourceDefinition.java    |   3 +-
 .../AmbariManagementControllerImpl.java         |   9 +-
 .../server/controller/KerberosHelper.java       | 429 +++++++++++--------
 .../1.10.3-10/configuration/kerberos-env.xml    |   9 +
 .../AmbariManagementControllerImplTest.java     |  61 ++-
 .../server/controller/KerberosHelperTest.java   |  81 +++-
 ambari-web/app/data/HDP2/site_properties.js     |   9 +
 7 files changed, 402 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java
index 94f2711..b2be291 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.ambari.server.api.query.render.ClusterBlueprintRenderer;
 import org.apache.ambari.server.api.query.render.Renderer;
+import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.controller.spi.Resource;
 
 /**
@@ -81,7 +82,7 @@ public class ClusterResourceDefinition extends BaseResourceDefinition {
   public Collection<String> getUpdateDirectives() {
     Collection<String> directives = super.getUpdateDirectives();
     directives.add("regenerate_keytabs");
-
+    directives.add(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES);
     return directives;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 9e3d98d..5959327 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -1420,7 +1420,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       // most of the validation logic will be left to the KerberosHelper to avoid polluting the controller
       if (kerberosHelper.shouldExecuteCustomOperations(securityType, requestProperties)) {
         try {
-          requestStageContainer = kerberosHelper.executeCustomOperations(cluster, requestProperties, requestStageContainer);
+          requestStageContainer = kerberosHelper.executeCustomOperations(cluster, requestProperties, requestStageContainer,
+              kerberosHelper.getManageIdentitiesDirective(requestProperties));
         } catch (KerberosOperationException e) {
           throw new IllegalArgumentException(e.getMessage(), e);
         }
@@ -1433,7 +1434,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
           // adding or removing Kerberos from the cluster. This may generate multiple stages
           // or not depending the current state of the cluster.
           try {
-            requestStageContainer = kerberosHelper.toggleKerberos(cluster, securityType, requestStageContainer);
+            requestStageContainer = kerberosHelper.toggleKerberos(cluster, securityType, requestStageContainer,
+                kerberosHelper.getManageIdentitiesDirective(requestProperties));
           } catch (KerberosOperationException e) {
             throw new IllegalArgumentException(e.getMessage(), e);
           }
@@ -2227,7 +2229,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
         }
 
         try {
-          kerberosHelper.ensureIdentities(cluster, serviceFilter, null, hostsToForceKerberosOperations, requestStages);
+          kerberosHelper.ensureIdentities(cluster, serviceFilter, null, hostsToForceKerberosOperations, requestStages,
+              kerberosHelper.getManageIdentitiesDirective(requestProperties));
         } catch (KerberosOperationException e) {
           throw new IllegalArgumentException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
index e8a6c0a..5cd75bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
@@ -111,6 +111,11 @@ import java.util.Random;
 import java.util.Set;
 
 public class KerberosHelper {
+  /**
+   * directive used to override the behavior of the kerberos-env/manage_identities property
+   */
+  public static final String DIRECTIVE_MANAGE_KERBEROS_IDENTITIES = "manage_kerberos_identities";
+
   private static final String BASE_LOG_DIR = "/tmp/ambari";
 
   private static final Logger LOG = LoggerFactory.getLogger(KerberosHelper.class);
@@ -196,6 +201,9 @@ public class KerberosHelper {
    *                              SecurityType.KERBEROS or SecurityType.NONE
    * @param requestStageContainer a RequestStageContainer to place generated stages, if needed -
    *                              if null a new RequestStageContainer will be created.
+   * @param manageIdentities      a Boolean value indicating how to override the configured behavior
+   *                              of managing Kerberos identities; if null the configured behavior
+   *                              will not be overridden
    * @return the updated or a new RequestStageContainer containing the stages that need to be
    * executed to complete this task; or null if no stages need to be executed.
    * @throws AmbariException
@@ -204,10 +212,11 @@ public class KerberosHelper {
    * @throws KerberosOperationException
    */
   public RequestStageContainer toggleKerberos(Cluster cluster, SecurityType securityType,
-                                              RequestStageContainer requestStageContainer)
+                                              RequestStageContainer requestStageContainer,
+                                              Boolean manageIdentities)
       throws AmbariException, KerberosOperationException {
 
-    KerberosDetails kerberosDetails = getKerberosDetails(cluster);
+    KerberosDetails kerberosDetails = getKerberosDetails(cluster, manageIdentities);
 
     // Update KerberosDetails with the new security type - the current one in the cluster is the "old" value
     kerberosDetails.setSecurityType(securityType);
@@ -232,15 +241,20 @@ public class KerberosHelper {
    * @param requestProperties     this structure is expected to hold already supported and validated directives
    *                              for the 'Cluster' resource. See ClusterResourceDefinition#getUpdateDirectives
    * @param requestStageContainer a RequestStageContainer to place generated stages, if needed -
-   *                              if null a new RequestStageContainer will be created.   @return the updated or a new RequestStageContainer containing the stages that need to be
-   *                              executed to complete this task; or null if no stages need to be executed.
+   *                              if null a new RequestStageContainer will be created.
+   * @param manageIdentities      a Boolean value indicating how to override the configured behavior
+   *                              of managing Kerberos identities; if null the configured behavior
+   *                              will not be overridden
+   * @return the updated or a new RequestStageContainer containing the stages that need to be
+   * executed to complete this task; or null if no stages need to be executed.
    * @throws AmbariException
    * @throws KerberosOperationException
    * @throws KerberosInvalidConfigurationException if an issue occurs trying to get the
    *                                               Kerberos-specific configuration details
    */
   public RequestStageContainer executeCustomOperations(Cluster cluster, Map<String, String> requestProperties,
-                                                       RequestStageContainer requestStageContainer)
+                                                       RequestStageContainer requestStageContainer,
+                                                       Boolean manageIdentities)
       throws AmbariException, KerberosOperationException {
 
     if (requestProperties != null) {
@@ -257,10 +271,10 @@ public class KerberosHelper {
               }
 
               if ("true".equalsIgnoreCase(value) || "all".equalsIgnoreCase(value)) {
-                requestStageContainer = handle(cluster, getKerberosDetails(cluster), null, null, null,
+                requestStageContainer = handle(cluster, getKerberosDetails(cluster, manageIdentities), null, null, null,
                     requestStageContainer, new CreatePrincipalsAndKeytabsHandler(true));
               } else if ("missing".equalsIgnoreCase(value)) {
-                requestStageContainer = handle(cluster, getKerberosDetails(cluster), null, null, null,
+                requestStageContainer = handle(cluster, getKerberosDetails(cluster, manageIdentities), null, null, null,
                     requestStageContainer, new CreatePrincipalsAndKeytabsHandler(false));
               } else {
                 throw new AmbariException(String.format("Unexpected directive value: %s", value));
@@ -309,6 +323,9 @@ public class KerberosHelper {
    * @param requestStageContainer          a RequestStageContainer to place generated stages, if
    *                                       needed - if null a new RequestStageContainer will be
    *                                       created.
+   * @param manageIdentities               a Boolean value indicating how to override the configured behavior
+   *                                       of managing Kerberos identities; if null the configured behavior
+   *                                       will not be overridden
    * @return the updated or a new RequestStageContainer containing the stages that need to be
    * executed to complete this task; or null if no stages need to be executed.
    * @throws AmbariException
@@ -318,9 +335,9 @@ public class KerberosHelper {
    */
   public RequestStageContainer ensureIdentities(Cluster cluster, Map<String, ? extends Collection<String>> serviceComponentFilter,
                                                 Collection<String> identityFilter, Set<String> hostsToForceKerberosOperations,
-                                                RequestStageContainer requestStageContainer)
+                                                RequestStageContainer requestStageContainer, Boolean manageIdentities)
       throws AmbariException, KerberosOperationException {
-    return handle(cluster, getKerberosDetails(cluster), serviceComponentFilter, identityFilter,
+    return handle(cluster, getKerberosDetails(cluster, manageIdentities), serviceComponentFilter, identityFilter,
         hostsToForceKerberosOperations, requestStageContainer, new CreatePrincipalsAndKeytabsHandler(false));
   }
 
@@ -347,6 +364,9 @@ public class KerberosHelper {
    *                               relevant identities
    * @param requestStageContainer  a RequestStageContainer to place generated stages, if needed -
    *                               if null a new RequestStageContainer will be created.
+   * @param manageIdentities       a Boolean value indicating how to override the configured behavior
+   *                               of managing Kerberos identities; if null the configured behavior
+   *                               will not be overridden
    * @return the updated or a new RequestStageContainer containing the stages that need to be
    * executed to complete this task; or null if no stages need to be executed.
    * @throws AmbariException
@@ -355,9 +375,10 @@ public class KerberosHelper {
    *                                               Kerberos-specific configuration details
    */
   public RequestStageContainer deleteIdentities(Cluster cluster, Map<String, ? extends Collection<String>> serviceComponentFilter,
-                                                Collection<String> identityFilter, RequestStageContainer requestStageContainer)
+                                                Collection<String> identityFilter, RequestStageContainer requestStageContainer,
+                                                Boolean manageIdentities)
       throws AmbariException, KerberosOperationException {
-    return handle(cluster, getKerberosDetails(cluster), serviceComponentFilter, identityFilter, null,
+    return handle(cluster, getKerberosDetails(cluster, manageIdentities), serviceComponentFilter, identityFilter, null,
         requestStageContainer, new DeletePrincipalsAndKeytabsHandler());
   }
 
@@ -374,7 +395,7 @@ public class KerberosHelper {
   public void configureService(Cluster cluster, ServiceComponentHost serviceComponentHost)
       throws AmbariException, KerberosInvalidConfigurationException {
 
-    KerberosDetails kerberosDetails = getKerberosDetails(cluster);
+    KerberosDetails kerberosDetails = getKerberosDetails(cluster, null);
 
     // Set properties...
     String serviceName = serviceComponentHost.getServiceName();
@@ -436,7 +457,7 @@ public class KerberosHelper {
   public RequestStageContainer createTestIdentity(Cluster cluster, Map<String, String> commandParamsStage,
                                                   RequestStageContainer requestStageContainer)
       throws KerberosOperationException, AmbariException {
-    return handleTestIdentity(cluster, getKerberosDetails(cluster), commandParamsStage, requestStageContainer, new CreatePrincipalsAndKeytabsHandler(false));
+    return handleTestIdentity(cluster, getKerberosDetails(cluster, null), commandParamsStage, requestStageContainer, new CreatePrincipalsAndKeytabsHandler(false));
   }
 
   /**
@@ -452,7 +473,7 @@ public class KerberosHelper {
   public RequestStageContainer deleteTestIdentity(Cluster cluster, Map<String, String> commandParamsStage,
                                                   RequestStageContainer requestStageContainer)
       throws KerberosOperationException, AmbariException {
-    requestStageContainer = handleTestIdentity(cluster, getKerberosDetails(cluster), commandParamsStage, requestStageContainer, new DeletePrincipalsAndKeytabsHandler());
+    requestStageContainer = handleTestIdentity(cluster, getKerberosDetails(cluster, null), commandParamsStage, requestStageContainer, new DeletePrincipalsAndKeytabsHandler());
 
     // Clear the Kerberos service check identifier
     setKerberosServiceCheckIdentifier(cluster, null);
@@ -470,80 +491,102 @@ public class KerberosHelper {
       KerberosAdminAuthenticationException,
       KerberosInvalidConfigurationException,
       AmbariException {
-    String credentials = getEncryptedAdministratorCredentials(cluster);
-    if (credentials == null) {
-      throw new KerberosMissingAdminCredentialsException(
-          "Missing KDC administrator credentials.\n" +
-              "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
-              "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
-              "{\n" +
-              "  \"session_attributes\" : {\n" +
-              "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
-              "  }\n" +
-              "}"
-      );
-    } else {
-      KerberosDetails kerberosDetails = getKerberosDetails(cluster);
-      KerberosOperationHandler operationHandler = kerberosOperationHandlerFactory.getKerberosOperationHandler(kerberosDetails.getKdcType());
+    validateKDCCredentials(null, cluster);
+  }
+
+  /**
+   * Validate the KDC admin credentials.
+   *
+   * @param kerberosDetails the KerberosDetails containing information about the Kerberos configuration
+   *                        for the cluster, if null, a new KerberosDetails will be created based on
+   *                        information found in the associated cluster
+   * @param cluster         associated cluster
+   * @throws AmbariException if any other error occurs while trying to validate the credentials
+   */
+  private void validateKDCCredentials(KerberosDetails kerberosDetails, Cluster cluster) throws KerberosMissingAdminCredentialsException,
+      KerberosAdminAuthenticationException,
+      KerberosInvalidConfigurationException,
+      AmbariException {
 
-      if (operationHandler == null) {
-        throw new AmbariException("Failed to get an appropriate Kerberos operation handler.");
+    if(kerberosDetails == null) {
+      kerberosDetails = getKerberosDetails(cluster, null);
+    }
+
+    if(kerberosDetails.manageIdentities()) {
+      String credentials = getEncryptedAdministratorCredentials(cluster);
+      if (credentials == null) {
+        throw new KerberosMissingAdminCredentialsException(
+            "Missing KDC administrator credentials.\n" +
+                "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
+                "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
+                "{\n" +
+                "  \"session_attributes\" : {\n" +
+                "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
+                "  }\n" +
+                "}"
+        );
       } else {
-        byte[] key = Integer.toHexString(cluster.hashCode()).getBytes();
-        KerberosCredential kerberosCredentials = KerberosCredential.decrypt(credentials, key);
+        KerberosOperationHandler operationHandler = kerberosOperationHandlerFactory.getKerberosOperationHandler(kerberosDetails.getKdcType());
 
-        boolean missingCredentials = false;
-        try {
-          operationHandler.open(kerberosCredentials, kerberosDetails.getDefaultRealm(), kerberosDetails.getKerberosEnvProperties());
-          // todo: this is really odd that open doesn't throw an exception if the credentials are missing
-          missingCredentials = ! operationHandler.testAdministratorCredentials();
-        } catch (KerberosAdminAuthenticationException e) {
-          throw new KerberosAdminAuthenticationException(
-              "Invalid KDC administrator credentials.\n" +
-                  "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
-                  "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
-                  "{\n" +
-                  "  \"session_attributes\" : {\n" +
-                  "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
-                  "  }\n" +
-                  "}", e);
-        } catch (KerberosKDCConnectionException e) {
-          throw new KerberosInvalidConfigurationException(
-              "Failed to connect to KDC - " + e.getMessage() + "\n" +
-                  "Update the KDC settings in krb5-conf and kerberos-env configurations to correct this issue.",
-              e);
-        } catch (KerberosRealmException e) {
-          throw new KerberosInvalidConfigurationException(
-              "Failed to find a KDC for the specified realm - " + e.getMessage() + "\n" +
-                  "Update the KDC settings in krb5-conf and kerberos-env configurations to correct this issue.",
-              e);
-        } catch (KerberosLDAPContainerException e) {
-          throw new KerberosInvalidConfigurationException(
-              "The principal container was not specified\n" +
-                  "Set the 'container_dn' value in the kerberos-env configuration to correct this issue.",
-              e);
-        } catch (KerberosOperationException e) {
-          throw new AmbariException(e.getMessage(), e);
-        } finally {
+        if (operationHandler == null) {
+          throw new AmbariException("Failed to get an appropriate Kerberos operation handler.");
+        } else {
+          byte[] key = Integer.toHexString(cluster.hashCode()).getBytes();
+          KerberosCredential kerberosCredentials = KerberosCredential.decrypt(credentials, key);
+
+          boolean missingCredentials = false;
           try {
-            operationHandler.close();
+            operationHandler.open(kerberosCredentials, kerberosDetails.getDefaultRealm(), kerberosDetails.getKerberosEnvProperties());
+            // todo: this is really odd that open doesn't throw an exception if the credentials are missing
+            missingCredentials = !operationHandler.testAdministratorCredentials();
+          } catch (KerberosAdminAuthenticationException e) {
+            throw new KerberosAdminAuthenticationException(
+                "Invalid KDC administrator credentials.\n" +
+                    "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
+                    "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
+                    "{\n" +
+                    "  \"session_attributes\" : {\n" +
+                    "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
+                    "  }\n" +
+                    "}", e);
+          } catch (KerberosKDCConnectionException e) {
+            throw new KerberosInvalidConfigurationException(
+                "Failed to connect to KDC - " + e.getMessage() + "\n" +
+                    "Update the KDC settings in krb5-conf and kerberos-env configurations to correct this issue.",
+                e);
+          } catch (KerberosRealmException e) {
+            throw new KerberosInvalidConfigurationException(
+                "Failed to find a KDC for the specified realm - " + e.getMessage() + "\n" +
+                    "Update the KDC settings in krb5-conf and kerberos-env configurations to correct this issue.",
+                e);
+          } catch (KerberosLDAPContainerException e) {
+            throw new KerberosInvalidConfigurationException(
+                "The principal container was not specified\n" +
+                    "Set the 'container_dn' value in the kerberos-env configuration to correct this issue.",
+                e);
           } catch (KerberosOperationException e) {
-            // Ignore this...
+            throw new AmbariException(e.getMessage(), e);
+          } finally {
+            try {
+              operationHandler.close();
+            } catch (KerberosOperationException e) {
+              // Ignore this...
+            }
           }
-        }
 
-        // need to throw this outside of the try/catch so it isn't caught
-        if (missingCredentials) {
-          throw new KerberosMissingAdminCredentialsException(
-              "Invalid KDC administrator credentials.\n" +
-                  "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
-                  "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
-                  "{\n" +
-                  "  \"session_attributes\" : {\n" +
-                  "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
-                  "  }\n" +
-                  "}"
-          );
+          // need to throw this outside of the try/catch so it isn't caught
+          if (missingCredentials) {
+            throw new KerberosMissingAdminCredentialsException(
+                "Invalid KDC administrator credentials.\n" +
+                    "The KDC administrator credentials must be set in session by updating the relevant Cluster resource." +
+                    "This may be done by issuing a PUT to the api/v1/clusters/(cluster name) API entry point with the following payload:\n" +
+                    "{\n" +
+                    "  \"session_attributes\" : {\n" +
+                    "    \"kerberos_admin\" : {\"principal\" : \"(PRINCIPAL)\", \"password\" : \"(PASSWORD)\"}\n" +
+                    "  }\n" +
+                    "}"
+            );
+          }
         }
       }
     }
@@ -657,13 +700,13 @@ public class KerberosHelper {
    * @param identityFilter                 a Collection of identity names indicating the relevant identities -
    *                                       if null, no filter is relevant; if empty, the filter indicates no
    *                                       relevant identities
-   * @param requestStageContainer          a RequestStageContainer to place generated stages, if needed -
-   *                                       if null a new RequestStageContainer will be created.
    * @param hostsToForceKerberosOperations a set of host names on which it is expected that the
    *                                       Kerberos client is or will be in the INSTALLED state by
    *                                       the time the operations targeted for them are to be
    *                                       executed - if empty or null, this no hosts will be
    *                                       "forced"
+   * @param requestStageContainer          a RequestStageContainer to place generated stages, if needed -
+   *                                       if null a new RequestStageContainer will be created.
    * @param handler                        a Handler to use to provide guidance and set up stages
    *                                       to perform the work needed to complete the relative action
    * @return the updated or a new RequestStageContainer containing the stages that need to be
@@ -811,7 +854,7 @@ public class KerberosHelper {
         // are available
         if (!serviceComponentHostsToProcess.isEmpty()) {
           try {
-            validateKDCCredentials(cluster);
+            validateKDCCredentials(kerberosDetails, cluster);
           } catch (KerberosOperationException e) {
             try {
               FileUtils.deleteDirectory(dataDirectory);
@@ -1065,7 +1108,7 @@ public class KerberosHelper {
         // are available
         if (!serviceComponentHostsToProcess.isEmpty()) {
           try {
-            validateKDCCredentials(cluster);
+            validateKDCCredentials(kerberosDetails, cluster);
           } catch (KerberosOperationException e) {
             try {
               FileUtils.deleteDirectory(dataDirectory);
@@ -1119,11 +1162,14 @@ public class KerberosHelper {
    * Gathers the Kerberos-related data from configurations and stores it in a new KerberosDetails
    * instance.
    *
-   * @param cluster the relevant Cluster
+   * @param cluster          the relevant Cluster
+   * @param manageIdentities a Boolean value indicating how to override the configured behavior
+   *                         of managing Kerberos identities; if null the configured behavior
+   *                         will not be overridden
    * @return a new KerberosDetails with the collected configuration data
    * @throws AmbariException
    */
-  private KerberosDetails getKerberosDetails(Cluster cluster)
+  private KerberosDetails getKerberosDetails(Cluster cluster, Boolean manageIdentities)
       throws KerberosInvalidConfigurationException, AmbariException {
 
     KerberosDetails kerberosDetails = new KerberosDetails();
@@ -1186,6 +1232,9 @@ public class KerberosHelper {
 
     kerberosDetails.setKerberosEnvProperties(kerberosEnvProperties);
 
+    // If set, override the manage identities behavior
+    kerberosDetails.setManageIdentities(manageIdentities);
+
     return kerberosDetails;
   }
 
@@ -1865,6 +1914,27 @@ public class KerberosHelper {
   }
 
   /**
+   * Retrieves the value of the manage_kerberos_identities directive from the request properties,
+   * if it exists.
+   * <p/>
+   * If manage_kerberos_identities does not exist in the map of request properties, null is returned
+   * <p/>
+   * If manage_kerberos_identities does exists in the map of request properties, a Boolean value
+   * is returned indicating whether its value is "false" (Boolean.FALSE) or not (Boolean.TRUE).
+   *
+   * @param requestProperties a map of the requst property name/value pairs
+   * @return Boolean.TRUE or Boolean.FALSE if the manage_kerberos_identities property exists in the map;
+   * otherwise false
+   */
+  public Boolean getManageIdentitiesDirective(Map<String, String> requestProperties) {
+    String value = (requestProperties == null) ? null : requestProperties.get(DIRECTIVE_MANAGE_KERBEROS_IDENTITIES);
+
+    return (value == null)
+        ? null
+        : !"false".equalsIgnoreCase(value);
+  }
+
+  /**
    * Given a list of KerberosIdentityDescriptors, returns a Map fo configuration types to property
    * names and values.
    * <p/>
@@ -2337,20 +2407,22 @@ public class KerberosHelper {
       commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
       commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
 
-      // *****************************************************************
-      // Create stage to create principals
-      addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
-          roleCommandOrder, requestStageContainer);
-
-      // *****************************************************************
-      // Create stage to generate keytabs
-      addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
-          roleCommandOrder, requestStageContainer);
-
-      // *****************************************************************
-      // Create stage to distribute keytabs
-      addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson, hostParamsJson,
-          commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+      if (kerberosDetails.manageIdentities()) {
+        // *****************************************************************
+        // Create stage to create principals
+        addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+            roleCommandOrder, requestStageContainer);
+
+        // *****************************************************************
+        // Create stage to generate keytabs
+        addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+            roleCommandOrder, requestStageContainer);
+
+        // *****************************************************************
+        // Create stage to distribute keytabs
+        addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson, hostParamsJson,
+            commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+      }
 
       // *****************************************************************
       // Create stage to update configurations of services
@@ -2359,7 +2431,6 @@ public class KerberosHelper {
 
       return requestStageContainer.getLastStageId();
     }
-
   }
 
   /**
@@ -2542,15 +2613,17 @@ public class KerberosHelper {
       addUpdateConfigurationsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
           roleCommandOrder, requestStageContainer);
 
-      // *****************************************************************
-      // Create stage to remove principals
-      addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
-          roleCommandOrder, requestStageContainer);
+      if(kerberosDetails.manageIdentities()) {
+        // *****************************************************************
+        // Create stage to remove principals
+        addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+            roleCommandOrder, requestStageContainer);
 
-      // *****************************************************************
-      // Create stage to delete keytabs
-      addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
-          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+        // *****************************************************************
+        // Create stage to delete keytabs
+        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+            hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+      }
 
       return requestStageContainer.getLastStageId();
     }
@@ -2617,41 +2690,44 @@ public class KerberosHelper {
                              List<ServiceComponentHost> serviceComponentHosts,
                              Set<String> hostsWithValidKerberosClient)
         throws AmbariException {
-      // If there are principals and keytabs to process, setup the following sages:
-      //  1) generate principals
-      //  2) generate keytab files
-      //  3) distribute keytab files
 
-      // If a RequestStageContainer does not already exist, create a new one...
-      if (requestStageContainer == null) {
-        requestStageContainer = new RequestStageContainer(
-            actionManager.getNextRequestId(),
-            null,
-            requestFactory,
-            actionManager);
-      }
+      if (kerberosDetails.manageIdentities()) {
+        // If there are principals and keytabs to process, setup the following sages:
+        //  1) generate principals
+        //  2) generate keytab files
+        //  3) distribute keytab files
 
-      Map<String, String> commandParameters = new HashMap<String, String>();
-      commandParameters.put(KerberosServerAction.AUTHENTICATED_USER_NAME, ambariManagementController.getAuthName());
-      commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
-      commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
-      commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
-      commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
-      commandParameters.put(KerberosServerAction.REGENERATE_ALL, (regenerateAllKeytabs) ? "true" : "false");
-
-      // *****************************************************************
-      // Create stage to create principals
-      addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
-          commandParameters, roleCommandOrder, requestStageContainer);
-
-      // *****************************************************************
-      // Create stage to generate keytabs
-      addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event,
-          commandParameters, roleCommandOrder, requestStageContainer);
+        // If a RequestStageContainer does not already exist, create a new one...
+        if (requestStageContainer == null) {
+          requestStageContainer = new RequestStageContainer(
+              actionManager.getNextRequestId(),
+              null,
+              requestFactory,
+              actionManager);
+        }
 
-      // Create stage to distribute keytabs
-      addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
-          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+        Map<String, String> commandParameters = new HashMap<String, String>();
+        commandParameters.put(KerberosServerAction.AUTHENTICATED_USER_NAME, ambariManagementController.getAuthName());
+        commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
+        commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
+        commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
+        commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
+        commandParameters.put(KerberosServerAction.REGENERATE_ALL, (regenerateAllKeytabs) ? "true" : "false");
+
+        // *****************************************************************
+        // Create stage to create principals
+        addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
+            commandParameters, roleCommandOrder, requestStageContainer);
+
+        // *****************************************************************
+        // Create stage to generate keytabs
+        addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event,
+            commandParameters, roleCommandOrder, requestStageContainer);
+
+        // Create stage to distribute keytabs
+        addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+            hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+      }
 
       return requestStageContainer.getLastStageId();
     }
@@ -2699,35 +2775,38 @@ public class KerberosHelper {
                              List<ServiceComponentHost> serviceComponentHosts,
                              Set<String> hostsWithValidKerberosClient)
         throws AmbariException {
-      // If there are principals and keytabs to process, setup the following sages:
-      //  1) delete principals
-      //  2) delete keytab files
-
-      // If a RequestStageContainer does not already exist, create a new one...
-      if (requestStageContainer == null) {
-        requestStageContainer = new RequestStageContainer(
-            actionManager.getNextRequestId(),
-            null,
-            requestFactory,
-            actionManager);
-      }
 
-      Map<String, String> commandParameters = new HashMap<String, String>();
-      commandParameters.put(KerberosServerAction.AUTHENTICATED_USER_NAME, ambariManagementController.getAuthName());
-      commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
-      commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
-      commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
-      commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
+      if (kerberosDetails.manageIdentities()) {
+        // If there are principals and keytabs to process, setup the following sages:
+        //  1) delete principals
+        //  2) delete keytab files
 
-      // *****************************************************************
-      // Create stage to delete principals
-      addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
-          commandParameters, roleCommandOrder, requestStageContainer);
+        // If a RequestStageContainer does not already exist, create a new one...
+        if (requestStageContainer == null) {
+          requestStageContainer = new RequestStageContainer(
+              actionManager.getNextRequestId(),
+              null,
+              requestFactory,
+              actionManager);
+        }
 
-      // *****************************************************************
-      // Create stage to delete keytabs
-      addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
-          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+        Map<String, String> commandParameters = new HashMap<String, String>();
+        commandParameters.put(KerberosServerAction.AUTHENTICATED_USER_NAME, ambariManagementController.getAuthName());
+        commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
+        commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
+        commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
+        commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
+
+        // *****************************************************************
+        // Create stage to delete principals
+        addDestroyPrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
+            commandParameters, roleCommandOrder, requestStageContainer);
+
+        // *****************************************************************
+        // Create stage to delete keytabs
+        addDeleteKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+            hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer, hostsWithValidKerberosClient);
+      }
 
       return requestStageContainer.getLastStageId();
     }
@@ -2742,6 +2821,7 @@ public class KerberosHelper {
     private KDCType kdcType;
     private Map<String, String> kerberosEnvProperties;
     private SecurityType securityType;
+    private Boolean manageIdentities;
 
     public void setDefaultRealm(String defaultRealm) {
       this.defaultRealm = defaultRealm;
@@ -2774,5 +2854,18 @@ public class KerberosHelper {
     public SecurityType getSecurityType() {
       return securityType;
     }
+
+    public boolean manageIdentities() {
+      if (manageIdentities == null) {
+        return (kerberosEnvProperties == null) ||
+            !"false".equalsIgnoreCase(kerberosEnvProperties.get("manage_identities"));
+      } else {
+        return manageIdentities;
+      }
+    }
+
+    public void setManageIdentities(Boolean manageIdentities) {
+      this.manageIdentities = manageIdentities;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/configuration/kerberos-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/configuration/kerberos-env.xml b/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/configuration/kerberos-env.xml
index 9c12b34..6d2bc56 100644
--- a/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/configuration/kerberos-env.xml
+++ b/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/configuration/kerberos-env.xml
@@ -29,6 +29,15 @@
     <value>mit-kdc</value>
   </property>
 
+  <property>
+    <name>manage_identities</name>
+    <description>
+      Indicates whether the Ambari user and service Kerberos identities (principals and keytab files)
+      should be managed (created, deleted, updated, etc...) by Ambari or managed manually.
+    </description>
+    <value>true</value>
+  </property>
+
   <property require-input="true">
     <name>ldap_url</name>
     <description>

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerImplTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerImplTest.java
index a71b2db..5778434 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerImplTest.java
@@ -76,18 +76,7 @@ import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.DB_DRIVER
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JAVA_VERSION;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_NAME;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_VERSION;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.easymock.EasyMock.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -614,7 +603,10 @@ public class AmbariManagementControllerImplTest {
     expect(kerberosHelper.shouldExecuteCustomOperations(SecurityType.KERBEROS, null))
         .andReturn(false)
         .once();
-    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class)))
+    expect(kerberosHelper.getManageIdentitiesDirective(null))
+        .andReturn(null)
+        .once();
+    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class), anyBoolean()))
         .andReturn(null)
         .once();
 
@@ -635,7 +627,33 @@ public class AmbariManagementControllerImplTest {
    * IS invoked
    */
   @Test
-  public void testUpdateClustersToggleKerberosDisable() throws Exception {
+  public void testUpdateClustersToggleKerberosDisable_Default() throws Exception {
+    testUpdateClustersToggleKerberosDisable(null);
+  }
+
+  /**
+   * Ensure that when the cluster security type updated from KERBEROS to NONE, KerberosHandler.toggleKerberos
+   * IS invoked and identities are not managed
+   */
+  @Test
+  public void testUpdateClustersToggleKerberosDisable_NoManageIdentities() throws Exception {
+    testUpdateClustersToggleKerberosDisable(Boolean.FALSE);
+  }
+
+  /**
+   * Ensure that when the cluster security type updated from KERBEROS to NONE, KerberosHandler.toggleKerberos
+   * IS invoked and identities are managed
+   */
+  @Test
+  public void testUpdateClustersToggleKerberosDisable_ManageIdentities() throws Exception {
+    testUpdateClustersToggleKerberosDisable(Boolean.TRUE);
+  }
+
+  /**
+   * Ensure that when the cluster security type updated from KERBEROS to NONE, KerberosHandler.toggleKerberos
+   * IS invoked
+   */
+  private void testUpdateClustersToggleKerberosDisable(Boolean manageIdentities) throws Exception {
     // member state mocks
     Capture<AmbariManagementController> controllerCapture = new Capture<AmbariManagementController>();
     Injector injector = createStrictMock(Injector.class);
@@ -646,6 +664,8 @@ public class AmbariManagementControllerImplTest {
     // requests
     Set<ClusterRequest> setRequests = Collections.singleton(clusterRequest);
 
+    Capture<Boolean> manageIdentitiesCapture = new Capture<Boolean>();
+
     KerberosHelper kerberosHelper = createStrictMock(KerberosHelper.class);
     // expectations
     injector.injectMembers(capture(controllerCapture));
@@ -664,7 +684,10 @@ public class AmbariManagementControllerImplTest {
     expect(kerberosHelper.shouldExecuteCustomOperations(SecurityType.NONE, null))
         .andReturn(false)
         .once();
-    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class)))
+    expect(kerberosHelper.getManageIdentitiesDirective(anyObject(Map.class)))
+        .andReturn(manageIdentities)
+        .once();
+    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class), captureBoolean(manageIdentitiesCapture)))
         .andReturn(null)
         .once();
 
@@ -677,6 +700,7 @@ public class AmbariManagementControllerImplTest {
 
     // assert and verify
     assertSame(controller, controllerCapture.getValue());
+    assertEquals(manageIdentities, manageIdentitiesCapture.getValue());
     verify(actionManager, cluster, clusters, injector, clusterRequest, sessionManager, kerberosHelper);
   }
 
@@ -685,7 +709,7 @@ public class AmbariManagementControllerImplTest {
    * IS invoked
    */
   @Test
-  public void testUpdateClustersToggleKerberosFail() throws Exception {
+  public void testUpdateClustersToggleKerberos_Fail() throws Exception {
     // member state mocks
     Capture<AmbariManagementController> controllerCapture = new Capture<AmbariManagementController>();
     Injector injector = createStrictMock(Injector.class);
@@ -723,7 +747,10 @@ public class AmbariManagementControllerImplTest {
     expect(kerberosHelper.shouldExecuteCustomOperations(SecurityType.NONE, null))
         .andReturn(false)
         .once();
-    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class)))
+    expect(kerberosHelper.getManageIdentitiesDirective(anyObject(Map.class)))
+        .andReturn(null)
+        .once();
+    expect(kerberosHelper.toggleKerberos(anyObject(Cluster.class), anyObject(SecurityType.class), anyObject(RequestStageContainer.class), anyBoolean()))
         .andThrow(new IllegalArgumentException("bad args!"))
         .once();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
index 524c511..ee11ee7 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/KerberosHelperTest.java
@@ -27,6 +27,7 @@ import static org.easymock.EasyMock.getCurrentArguments;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -203,7 +204,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     RequestStageContainer requestStageContainer = createNiceMock(RequestStageContainer.class);
 
     replayAll();
-    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, requestStageContainer);
+    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, requestStageContainer, true);
     verifyAll();
   }
 
@@ -222,7 +223,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     expect(cluster.getDesiredConfigByType("kerberos-env")).andReturn(kerberosEnvConfig).once();
 
     replayAll();
-    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, null);
+    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, null, true);
     verifyAll();
   }
 
@@ -244,7 +245,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     expect(cluster.getDesiredConfigByType("krb5-conf")).andReturn(krb5ConfConfig).once();
 
     replayAll();
-    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, null);
+    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, null, true);
     verifyAll();
   }
 
@@ -339,7 +340,7 @@ public class KerberosHelperTest extends EasyMockSupport {
 
     try {
       kerberosHelper.executeCustomOperations(cluster,
-          Collections.singletonMap("invalid_operation", "false"), null);
+          Collections.singletonMap("invalid_operation", "false"), null, true);
     } catch (Throwable t) {
       Assert.fail("Exception should not have been thrown");
     }
@@ -351,7 +352,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     final Cluster cluster = createNiceMock(Cluster.class);
 
     kerberosHelper.executeCustomOperations(cluster,
-        Collections.singletonMap("regenerate_keytabs", "false"), null);
+        Collections.singletonMap("regenerate_keytabs", "false"), null, true);
     Assert.fail("AmbariException should have failed");
   }
 
@@ -683,7 +684,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     // Needed by infrastructure
     metaInfo.init();
 
-    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, requestStageContainer);
+    kerberosHelper.toggleKerberos(cluster, SecurityType.KERBEROS, requestStageContainer, true);
 
     verifyAll();
   }
@@ -975,7 +976,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     // Needed by infrastructure
     metaInfo.init();
 
-    kerberosHelper.toggleKerberos(cluster, SecurityType.NONE, requestStageContainer);
+    kerberosHelper.toggleKerberos(cluster, SecurityType.NONE, requestStageContainer, true);
 
     verifyAll();
   }
@@ -1265,7 +1266,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     // Needed by infrastructure
     metaInfo.init();
 
-    Assert.assertNotNull(kerberosHelper.executeCustomOperations(cluster, Collections.singletonMap("regenerate_keytabs", "true"), requestStageContainer));
+    Assert.assertNotNull(kerberosHelper.executeCustomOperations(cluster, Collections.singletonMap("regenerate_keytabs", "true"), requestStageContainer, true));
 
     verifyAll();
   }
@@ -1294,6 +1295,66 @@ public class KerberosHelperTest extends EasyMockSupport {
     verify(cluster);
   }
 
+  @Test
+  public void testGetManageIdentitiesDirective_NotSet() throws Exception {
+    KerberosHelper kerberosHelper = injector.getInstance(KerberosHelper.class);
+    assertEquals(null, kerberosHelper.getManageIdentitiesDirective(null));
+    assertEquals(null, kerberosHelper.getManageIdentitiesDirective(Collections.<String, String>emptyMap()));
+
+    assertEquals(null, kerberosHelper.getManageIdentitiesDirective(
+        new HashMap<String, String>() {
+          {
+            put(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, null);
+            put("some_directive_0", "false");
+            put("some_directive_1", null);
+          }
+        }
+    ));
+
+    assertEquals(null, kerberosHelper.getManageIdentitiesDirective(
+        new HashMap<String, String>() {
+          {
+            put("some_directive_0", "false");
+            put("some_directive_1", null);
+          }
+        }
+    ));
+  }
+
+  @Test
+  public void testGetManageIdentitiesDirective_True() throws Exception {
+    KerberosHelper kerberosHelper = injector.getInstance(KerberosHelper.class);
+    assertEquals(Boolean.TRUE, kerberosHelper.getManageIdentitiesDirective(Collections.singletonMap(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, "true")));
+    assertEquals(Boolean.TRUE, kerberosHelper.getManageIdentitiesDirective(Collections.singletonMap(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, "not_false")));
+
+    assertEquals(Boolean.TRUE, kerberosHelper.getManageIdentitiesDirective(
+        new HashMap<String, String>() {
+          {
+            put(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, "true");
+            put("some_directive_0", "false");
+            put("some_directive_1", null);
+          }
+        }
+    ));
+  }
+
+  @Test
+  public void testGetManageIdentitiesDirective_False() throws Exception {
+    KerberosHelper kerberosHelper = injector.getInstance(KerberosHelper.class);
+    assertEquals(Boolean.FALSE, kerberosHelper.getManageIdentitiesDirective(Collections.singletonMap(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, "false")));
+
+    assertEquals(Boolean.FALSE, kerberosHelper.getManageIdentitiesDirective(
+        new HashMap<String, String>() {
+          {
+            put(KerberosHelper.DIRECTIVE_MANAGE_KERBEROS_IDENTITIES, "false");
+            put("some_directive_0", "false");
+            put("some_directive_1", null);
+          }
+        }
+    ));
+  }
+
+
   private void setClusterController() throws Exception {
     KerberosHelper kerberosHelper = injector.getInstance(KerberosHelper.class);
 
@@ -1682,7 +1743,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     serviceComponentFilter.put("SERVICE3", Collections.singleton("COMPONENT3"));
     serviceComponentFilter.put("SERVICE1", null);
 
-    kerberosHelper.ensureIdentities(cluster, serviceComponentFilter, identityFilter, null, requestStageContainer);
+    kerberosHelper.ensureIdentities(cluster, serviceComponentFilter, identityFilter, null, requestStageContainer, true);
 
     verifyAll();
   }
@@ -1959,7 +2020,7 @@ public class KerberosHelperTest extends EasyMockSupport {
     serviceComponentFilter.put("SERVICE3", Collections.singleton("COMPONENT3"));
     serviceComponentFilter.put("SERVICE1", null);
 
-    kerberosHelper.deleteIdentities(cluster, serviceComponentFilter, identityFilter, requestStageContainer);
+    kerberosHelper.deleteIdentities(cluster, serviceComponentFilter, identityFilter, requestStageContainer, true);
 
     verifyAll();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/3133b1db/ambari-web/app/data/HDP2/site_properties.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/data/HDP2/site_properties.js b/ambari-web/app/data/HDP2/site_properties.js
index 205aead..733e0ac 100644
--- a/ambari-web/app/data/HDP2/site_properties.js
+++ b/ambari-web/app/data/HDP2/site_properties.js
@@ -1961,6 +1961,15 @@ var hdp2properties = [
   },
   {
     "id": "puppet var",
+    "name": "manage_identities",
+    "displayName": "Manage Kerberos Identities",
+    "displayType": "checkbox",
+    "serviceName": "KERBEROS",
+    "filename": "kerberos-env.xml",
+    "category": "Advanced kerberos-env"
+  },
+  {
+    "id": "puppet var",
     "name": "create_attributes_template",
     "displayName": "Attribute template",
     "serviceName": "KERBEROS",