You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2015/01/22 23:16:45 UTC

[05/14] ambari git commit: AMBARI-9149. Test principal and keytab required for service check should be created as part of kerberos service check action (rlevas)

AMBARI-9149. Test principal and keytab required for service check should be created as part of kerberos service check action (rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/339e8a76
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/339e8a76
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/339e8a76

Branch: refs/heads/2.0-preview
Commit: 339e8a76a4b43c5ba12953a3cd66558647eda4a9
Parents: 9f29148
Author: Robert Levas <rl...@hortonworks.com>
Authored: Wed Jan 21 13:51:45 2015 -0500
Committer: Yusaku Sako <yu...@hortonworks.com>
Committed: Wed Jan 21 12:21:46 2015 -0800

----------------------------------------------------------------------
 .../AmbariManagementControllerImpl.java         |  33 +-
 .../server/controller/KerberosHelper.java       | 876 ++++++++++++-------
 .../internal/RequestStageContainer.java         |  24 +-
 .../KERBEROS/1.10.3-10/kerberos.json            |  17 +
 .../main/resources/stacks/HDP/2.2/kerberos.json |   2 +-
 .../AmbariCustomCommandExecutionHelperTest.java |  49 +-
 .../BackgroundCustomCommandExecutionTest.java   |  16 +-
 .../server/controller/KerberosHelperTest.java   | 327 ++++++-
 8 files changed, 992 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ae57d1f..dd18e8d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -2823,12 +2823,31 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       actionExecutionHelper.validateAction(actionRequest);
     }
 
+    long requestId = actionManager.getNextRequestId();
+    RequestStageContainer requestStageContainer = new RequestStageContainer(
+        requestId,
+        null,
+        requestFactory,
+        actionManager,
+        actionRequest);
+
+    // If the request is to perform the Kerberos service check, set up the stages to
+    // ensure that the (cluster-level) smoke user principal and keytab is available on all hosts
+    if (Role.KERBEROS_SERVICE_CHECK.name().equals(actionRequest.getCommandName())) {
+      Map<String, Collection<String>> serviceComponentFilter = new HashMap<String, Collection<String>>();
+      Collection<String> identityFilter = Arrays.asList("/smokeuser");
+
+      serviceComponentFilter.put("KERBEROS", null);
+
+      requestStageContainer = kerberosHelper.ensureIdentities(cluster, null, serviceComponentFilter,
+          identityFilter, requestStageContainer);
+    }
+
     ExecuteCommandJson jsons = customCommandExecutionHelper.getCommandJson(
         actionExecContext, cluster);
 
-    Stage stage = createNewStage(0, cluster, actionManager.getNextRequestId(), requestContext,
-      jsons.getClusterHostInfo(), jsons.getCommandParamsForStage(),
-        jsons.getHostParamsForStage());
+    Stage stage = createNewStage(requestStageContainer.getLastStageId(), cluster, requestId, requestContext,
+        jsons.getClusterHostInfo(), jsons.getCommandParamsForStage(), jsons.getHostParamsForStage());
 
     if (actionRequest.isCommand()) {
       customCommandExecutionHelper.addExecutionCommandsToStage(actionExecContext, stage, requestProperties, false);
@@ -2848,11 +2867,11 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     List<Stage> stages = rg.getStages();
 
     if (stages != null && !stages.isEmpty()) {
-      actionManager.sendActions(stages, actionRequest);
-      return getRequestStatusResponse(stage.getRequestId());
-    } else {
-      throw new AmbariException("Stage was not created");
+      requestStageContainer.addStages(stages);
     }
+
+    requestStageContainer.persist();
+    return requestStageContainer.getRequestStatusResponse();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
index a425e95..6620577 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelper.java
@@ -115,6 +115,10 @@ public class KerberosHelper {
    */
   private Handler disableKerberosHandler = new DisableKerberosHandler();
 
+  /**
+   * The Handler implementation that provides the logic to ensure the existence of principals and keytabs
+   */
+  private Handler createPrincipalsAndKeytabsHandler = new CreatePrincipalsAndKeytabsHandler();
 
   /**
    * Toggles Kerberos security to enable it or remove it depending on the state of the cluster.
@@ -144,98 +148,49 @@ public class KerberosHelper {
                                               RequestStageContainer requestStageContainer)
       throws AmbariException {
 
-    if (cluster == null) {
-      String message = "The cluster object is not available";
-      LOG.error(message);
-      throw new AmbariException(message);
-    }
-
-    Config configClusterEnv = cluster.getDesiredConfigByType("cluster-env");
-    if (configClusterEnv == null) {
-      String message = "The 'cluster-env' configuration is not available";
-      LOG.error(message);
-      throw new AmbariException(message);
-    }
-
-    Map<String, String> clusterEnvProperties = configClusterEnv.getProperties();
-    if (clusterEnvProperties == null) {
-      String message = "The 'cluster-env' configuration properties are not available";
-      LOG.error(message);
-      throw new AmbariException(message);
-    }
+    KerberosDetails kerberosDetails = getKerberosDetails(cluster);
 
-    String securityEnabled = clusterEnvProperties.get("security_enabled");
-    if ((securityEnabled == null) || securityEnabled.isEmpty()) {
-      LOG.warn("Missing 'securityEnabled' property of cluster-env, unable to determine the cluster's security state. This may be ok.");
+    if (kerberosDetails.isSecurityEnabled()) {
+      LOG.info("Configuring Kerberos for realm {} on cluster, {}", kerberosDetails.getDefaultRealm(), cluster.getClusterName());
+      requestStageContainer = handle(cluster, kerberosDescriptor, kerberosDetails, null, null, requestStageContainer, enableKerberosHandler);
     } else {
-      String defaultRealm = clusterEnvProperties.get("kerberos_domain");
-
-      Config configKrb5Conf = cluster.getDesiredConfigByType("krb5-conf");
-      if (configKrb5Conf == null) {
-        String message = "The 'krb5-conf' configuration is not available";
-        LOG.error(message);
-        throw new AmbariException(message);
-      }
-
-      Map<String, String> krb5ConfProperties = configKrb5Conf.getProperties();
-      if (krb5ConfProperties == null) {
-        String message = "The 'krb5-conf' configuration properties are not available";
-        LOG.error(message);
-        throw new AmbariException(message);
-      }
-
-      Config configKerberosEnv = cluster.getDesiredConfigByType("kerberos-env");
-      if (configKerberosEnv == null) {
-        String message = "The 'kerberos-env' configuration is not available";
-        LOG.error(message);
-        throw new AmbariException(message);
-      }
-
-      Map<String, String> kerberosEnvProperties = configKerberosEnv.getProperties();
-      if (kerberosEnvProperties == null) {
-        String message = "The 'kerberos-env' configuration properties are not available";
-        LOG.error(message);
-        throw new AmbariException(message);
-      }
-
-      KDCType kdcType = null;
-      String kdcTypeProperty = kerberosEnvProperties.get("kdc_type");
-      if (kdcTypeProperty == null) {
-        // TODO: (rlevas) Only pull from kerberos-env, this is only for transitional purposes (AMBARI 9121)
-        kdcTypeProperty = krb5ConfProperties.get("kdc_type");
-      }
-      if (kdcTypeProperty != null) {
-        try {
-          kdcType = KDCType.translate(kdcTypeProperty);
-        } catch (IllegalArgumentException e) {
-          String message = String.format("Invalid 'kdc_type' value: %s", kdcTypeProperty);
-          LOG.error(message);
-          throw new AmbariException(message);
-        }
-      }
-
-      if (kdcType == null) {
-        // Set the KDCType to the the MIT_KDC as a fallback.
-        kdcType = KDCType.MIT_KDC;
-      }
-
-      if ("true".equalsIgnoreCase(securityEnabled)) {
-        LOG.info("Configuring Kerberos for realm {} on cluster, {}", defaultRealm, cluster.getClusterName());
-        requestStageContainer = handle(cluster, kerberosDescriptor, defaultRealm, kdcType, kerberosEnvProperties, requestStageContainer, enableKerberosHandler);
-      } else if ("false".equalsIgnoreCase(securityEnabled)) {
-        LOG.info("Disabling Kerberos from cluster, {}", cluster.getClusterName());
-        requestStageContainer = handle(cluster, kerberosDescriptor, defaultRealm, kdcType, kerberosEnvProperties, requestStageContainer, disableKerberosHandler);
-      } else {
-        String message = String.format("Invalid value for `security_enabled` property of cluster-env: %s", securityEnabled);
-        LOG.error(message);
-        throw new AmbariException(message);
-      }
+      LOG.info("Disabling Kerberos from cluster, {}", cluster.getClusterName());
+      requestStageContainer = handle(cluster, kerberosDescriptor, kerberosDetails, null, null, requestStageContainer, disableKerberosHandler);
     }
 
     return requestStageContainer;
   }
 
   /**
+   * Ensures the set of filtered principals and keytabs exist on the cluster.
+   * <p/>
+   * No configurations will be altered as a result of this operation, however principals and keytabs
+   * may be updated or created.
+   *
+   * @param cluster                the relevant Cluster
+   * @param kerberosDescriptor     a KerberosDescriptor containing updates to the descriptor already
+   *                               configured for the cluster
+   * @param serviceComponentFilter a Map of service names to component names indicating the relevant
+   *                               set of services and components - if null, no filter is relevant;
+   *                               if empty, the filter indicates no relevant services or components
+   * @param identityFilter         a Collection of identity names indicating the relevant identities -
+   *                               if null, no filter is relevant; if empty, the filter indicates no
+   *                               relevant identities
+   * @param requestStageContainer  a RequestStageContainer to place generated stages, if needed -
+   *                               if null a new RequestStageContainer will be created.
+   * @return the updated or a new RequestStageContainer containing the stages that need to be
+   * executed to complete this task; or null if no stages need to be executed.
+   * @throws AmbariException
+   */
+  public RequestStageContainer ensureIdentities(Cluster cluster, KerberosDescriptor kerberosDescriptor,
+                                                Map<String, Collection<String>> serviceComponentFilter,
+                                                Collection<String> identityFilter,
+                                                RequestStageContainer requestStageContainer) throws AmbariException {
+    return handle(cluster, kerberosDescriptor, getKerberosDetails(cluster), serviceComponentFilter, identityFilter,
+        requestStageContainer, createPrincipalsAndKeytabsHandler);
+  }
+
+  /**
    * Performs operations needed to enable to disable Kerberos on the relevant cluster.
    * <p/>
    * Iterates through the components installed on the relevant cluster and attempts to enable or
@@ -244,13 +199,17 @@ public class KerberosHelper {
    * The supplied Handler instance handles the logic on whether this process enables or disables
    * Kerberos.
    *
-   * @param cluster               the relevant Cluster
-   * @param kerberosDescriptor    the (derived) KerberosDescriptor
-   * @param realm                 the default Kerberos realm for the Cluster
-   * @param kdcType               a KDCType declaring the type of the relevant KDC
-   * @param kerberosEnvProperties a MAp of key/value pairs from the kerberos-env configuration
-   * @param requestStageContainer a RequestStageContainer to place generated stages, if needed -
-   *                              if null a new RequestStageContainer will be created.
+   * @param cluster                the relevant Cluster
+   * @param kerberosDescriptor     the (derived) KerberosDescriptor
+   * @param kerberosDetails        a KerberosDetails containing information about relevant Kerberos configuration
+   * @param serviceComponentFilter a Map of service names to component names indicating the relevant
+   *                               set of services and components - if null, no filter is relevant;
+   *                               if empty, the filter indicates no relevant services or components
+   * @param identityFilter         a Collection of identity names indicating the relevant identities -
+   *                               if null, no filter is relevant; if empty, the filter indicates no
+   *                               relevant identities
+   * @param requestStageContainer  a RequestStageContainer to place generated stages, if needed -
+   *                               if null a new RequestStageContainer will be created.
    * @return the updated or a new RequestStageContainer containing the stages that need to be
    * executed to complete this task; or null if no stages need to be executed.
    * @throws AmbariException
@@ -258,8 +217,10 @@ public class KerberosHelper {
   @Transactional
   private RequestStageContainer handle(Cluster cluster,
                                        KerberosDescriptor kerberosDescriptor,
-                                       String realm, KDCType kdcType,
-                                       Map<String, String> kerberosEnvProperties, RequestStageContainer requestStageContainer,
+                                       KerberosDetails kerberosDetails,
+                                       Map<String, Collection<String>> serviceComponentFilter,
+                                       Collection<String> identityFilter,
+                                       RequestStageContainer requestStageContainer,
                                        Handler handler) throws AmbariException {
 
     Map<String, Service> services = cluster.getServices();
@@ -330,6 +291,7 @@ public class KerberosHelper {
                 // Add the current hostname under "host" and "hostname"
                 generalProperties.put("host", hostname);
                 generalProperties.put("hostname", hostname);
+                generalProperties.put("cluster_name", clusterName);
 
                 if (configurations.get("") == null) {
                   configurations.put("", generalProperties);
@@ -342,47 +304,55 @@ public class KerberosHelper {
                 // keytab files, and configurations need to be created or updated.
                 for (ServiceComponentHost sch : serviceComponentHosts) {
                   String serviceName = sch.getServiceName();
-                  KerberosServiceDescriptor serviceDescriptor = kerberosDescriptor.getService(serviceName);
-
-                  if (serviceDescriptor != null) {
-                    KerberosComponentDescriptor componentDescriptor = serviceDescriptor.getComponent(sch.getServiceComponentName());
-                    List<KerberosIdentityDescriptor> serviceIdentities = serviceDescriptor.getIdentities(true);
-
-                    if (componentDescriptor != null) {
-                      List<KerberosIdentityDescriptor> componentIdentities = componentDescriptor.getIdentities(true);
-                      int identitiesAdded = 0;
-
-                      // Test to see if this component should be process by querying the handler
-                      if (handler.shouldProcess(desiredSecurityState, sch)) {
-                        // Calculate the set of configurations to update and replace any variables
-                        // using the previously calculated Map of configurations for the host.
-                        mergeConfigurations(kerberosConfigurations,
-                            componentDescriptor.getConfigurations(true), configurations);
-
-                        // Lazily create the KerberosActionDataFileBuilder instance...
-                        if (kerberosActionDataFileBuilder == null) {
-                          kerberosActionDataFileBuilder = new KerberosActionDataFileBuilder(indexFile);
-                        }
 
-                        // Add service-level principals (and keytabs)
-                        identitiesAdded += addIdentities(kerberosActionDataFileBuilder,
-                            serviceIdentities, sch, configurations);
+                  // If there is no filter or the filter contains the current service name...
+                  if ((serviceComponentFilter == null) || serviceComponentFilter.containsKey(serviceName)) {
+                    Collection<String> componentFilter = (serviceComponentFilter == null) ? null : serviceComponentFilter.get(serviceName);
+                    KerberosServiceDescriptor serviceDescriptor = kerberosDescriptor.getService(serviceName);
+
+                    if (serviceDescriptor != null) {
+                      String componentName = sch.getServiceComponentName();
+
+                      // If there is no filter or the filter contains the current component name,
+                      // test to see if this component should be process by querying the handler...
+                      if (((componentFilter == null) || componentFilter.contains(componentName)) && handler.shouldProcess(desiredSecurityState, sch)) {
+                        KerberosComponentDescriptor componentDescriptor = serviceDescriptor.getComponent(componentName);
+                        List<KerberosIdentityDescriptor> serviceIdentities = serviceDescriptor.getIdentities(true);
+
+                        if (componentDescriptor != null) {
+                          List<KerberosIdentityDescriptor> componentIdentities = componentDescriptor.getIdentities(true);
+                          int identitiesAdded = 0;
+
+                          // Calculate the set of configurations to update and replace any variables
+                          // using the previously calculated Map of configurations for the host.
+                          mergeConfigurations(kerberosConfigurations,
+                              componentDescriptor.getConfigurations(true), configurations);
+
+                          // Lazily create the KerberosActionDataFileBuilder instance...
+                          if (kerberosActionDataFileBuilder == null) {
+                            kerberosActionDataFileBuilder = new KerberosActionDataFileBuilder(indexFile);
+                          }
+
+                          // Add service-level principals (and keytabs)
+                          identitiesAdded += addIdentities(kerberosActionDataFileBuilder, serviceIdentities,
+                              identityFilter, hostname, serviceName, componentName, configurations);
+
+                          // Add component-level principals (and keytabs)
+                          identitiesAdded += addIdentities(kerberosActionDataFileBuilder, componentIdentities,
+                              identityFilter, hostname, serviceName, componentName, configurations);
 
-                        // Add component-level principals (and keytabs)
-                        identitiesAdded += addIdentities(kerberosActionDataFileBuilder,
-                            componentIdentities, sch, configurations);
+                          if (identitiesAdded > 0) {
+                            serviceComponentHostsToProcess.add(sch);
+                          }
 
-                        if (identitiesAdded > 0) {
-                          serviceComponentHostsToProcess.add(sch);
+                          // Add component-level principals to auth_to_local builder
+                          addIdentities(authToLocalBuilder, componentIdentities, identityFilter, configurations);
                         }
-                      }
 
-                      // Add component-level principals to auth_to_local builder
-                      addIdentities(authToLocalBuilder, componentIdentities, configurations);
+                        // Add service-level principals to auth_to_local builder
+                        addIdentities(authToLocalBuilder, serviceIdentities, identityFilter, configurations);
+                      }
                     }
-
-                    // Add service-level principals to auth_to_local builder
-                    addIdentities(authToLocalBuilder, serviceIdentities, configurations);
                   }
                 }
               }
@@ -420,7 +390,7 @@ public class KerberosHelper {
                       "}"
               );
             } else {
-              KerberosOperationHandler operationHandler = kerberosOperationHandlerFactory.getKerberosOperationHandler(kdcType);
+              KerberosOperationHandler operationHandler = kerberosOperationHandlerFactory.getKerberosOperationHandler(kerberosDetails.getKdcType());
 
               if (operationHandler == null) {
                 throw new AmbariException("Failed to get an appropriate Kerberos operation handler.");
@@ -429,7 +399,7 @@ public class KerberosHelper {
                 KerberosCredential kerberosCredentials = KerberosCredential.decrypt(credentials, key);
 
                 try {
-                  operationHandler.open(kerberosCredentials, realm, kerberosEnvProperties);
+                  operationHandler.open(kerberosCredentials, kerberosDetails.getDefaultRealm(), kerberosDetails.getKerberosEnvProperties());
                   if (!operationHandler.testAdministratorCredentials()) {
                     throw new IllegalArgumentException(
                         "Invalid KDC administrator credentials.\n" +
@@ -487,7 +457,7 @@ public class KerberosHelper {
             for (Map.Entry<String, String> entry : configuration.entrySet()) {
               if ("_AUTH_TO_LOCAL_RULES".equals(entry.getValue())) {
                 if (authToLocal == null) {
-                  authToLocal = authToLocalBuilder.generate(realm);
+                  authToLocal = authToLocalBuilder.generate(kerberosDetails.getDefaultRealm());
                 }
 
                 entry.setValue(authToLocal);
@@ -520,16 +490,15 @@ public class KerberosHelper {
         }
 
         // Use the handler implementation to setup the relevant stages.
-        int lastStageId = handler.createStages(cluster, hosts, kerberosConfigurations,
-            clusterHostInfoJson, hostParamsJson, event, roleCommandOrder, realm, kdcType,
-            dataDirectory, requestStageContainer, serviceComponentHostsToProcess);
+        handler.createStages(cluster, hosts, kerberosConfigurations, clusterHostInfoJson,
+            hostParamsJson, event, roleCommandOrder, kerberosDetails, dataDirectory,
+            requestStageContainer, serviceComponentHostsToProcess);
 
         // Add the cleanup stage...
-
         Map<String, String> commandParameters = new HashMap<String, String>();
         commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
 
-        Stage stage = createServerActionStage(++lastStageId,
+        Stage stage = createServerActionStage(requestStageContainer.getLastStageId(),
             cluster,
             requestStageContainer.getId(),
             "Finalize Operations",
@@ -549,14 +518,25 @@ public class KerberosHelper {
         for (ServiceComponentHost sch : serviceComponentHostsToProcess) {
           // Update the desired and current states for the ServiceComponentHost
           // using new state information from the the handler implementation
-          sch.setDesiredSecurityState(handler.getNewDesiredSCHSecurityState());
-          sch.setSecurityState(handler.getNewSCHSecurityState());
+          SecurityState newSecurityState;
+
+          newSecurityState = handler.getNewDesiredSCHSecurityState();
+          if (newSecurityState != null) {
+            sch.setDesiredSecurityState(newSecurityState);
+          }
+
+          newSecurityState = handler.getNewSCHSecurityState();
+          if (newSecurityState != null) {
+            sch.setSecurityState(newSecurityState);
+          }
         }
       }
 
       // If all goes well, set all services to _desire_ to be secured or unsecured, depending on handler
-      for (Service service : services.values()) {
-        service.setSecurityState(desiredSecurityState);
+      if (desiredSecurityState != null) {
+        for (Service service : services.values()) {
+          service.setSecurityState(desiredSecurityState);
+        }
       }
     }
 
@@ -564,6 +544,108 @@ public class KerberosHelper {
   }
 
   /**
+   * Gathers the Kerberos-related data from configurations and stores it in a new KerberosDetails
+   * instance.
+   *
+   * @param cluster the relevant Cluster
+   * @return a new KerberosDetails with the collected configuration data
+   * @throws AmbariException
+   */
+  private KerberosDetails getKerberosDetails(Cluster cluster) throws AmbariException {
+    KerberosDetails kerberosDetails = new KerberosDetails();
+
+    if (cluster == null) {
+      String message = "The cluster object is not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Config configClusterEnv = cluster.getDesiredConfigByType("cluster-env");
+    if (configClusterEnv == null) {
+      String message = "The 'cluster-env' configuration is not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Map<String, String> clusterEnvProperties = configClusterEnv.getProperties();
+    if (clusterEnvProperties == null) {
+      String message = "The 'cluster-env' configuration properties are not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    String securityEnabled = clusterEnvProperties.get("security_enabled");
+    if ((securityEnabled == null) || securityEnabled.isEmpty()) {
+      String message = "Missing 'securityEnabled' property of cluster-env, unable to determine the cluster's security state";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    if ("true".equalsIgnoreCase(securityEnabled)) {
+      kerberosDetails.setSecurityEnabled(true);
+    } else if ("false".equalsIgnoreCase(securityEnabled)) {
+      kerberosDetails.setSecurityEnabled(false);
+    } else {
+      String message = String.format("Invalid value for `security_enabled` property of cluster-env: %s", securityEnabled);
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Config configKrb5Conf = cluster.getDesiredConfigByType("krb5-conf");
+    if (configKrb5Conf == null) {
+      String message = "The 'krb5-conf' configuration is not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Map<String, String> krb5ConfProperties = configKrb5Conf.getProperties();
+    if (krb5ConfProperties == null) {
+      String message = "The 'krb5-conf' configuration properties are not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Config configKerberosEnv = cluster.getDesiredConfigByType("kerberos-env");
+    if (configKerberosEnv == null) {
+      String message = "The 'kerberos-env' configuration is not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    Map<String, String> kerberosEnvProperties = configKerberosEnv.getProperties();
+    if (kerberosEnvProperties == null) {
+      String message = "The 'kerberos-env' configuration properties are not available";
+      LOG.error(message);
+      throw new AmbariException(message);
+    }
+
+    KDCType kdcType = null;
+    String kdcTypeProperty = kerberosEnvProperties.get("kdc_type");
+    if (kdcTypeProperty == null) {
+      // TODO: (rlevas) Only pull from kerberos-env, this is only for transitional purposes (AMBARI 9121)
+      kdcTypeProperty = krb5ConfProperties.get("kdc_type");
+    }
+    if (kdcTypeProperty != null) {
+      try {
+        kdcType = KDCType.translate(kdcTypeProperty);
+      } catch (IllegalArgumentException e) {
+        String message = String.format("Invalid 'kdc_type' value: %s", kdcTypeProperty);
+        LOG.error(message);
+        throw new AmbariException(message);
+      }
+    }
+
+    kerberosDetails.setDefaultRealm(krb5ConfProperties.get("realm"));
+
+    // Set the KDCType to the the MIT_KDC as a fallback.
+    kerberosDetails.setKdcType((kdcType == null) ? KDCType.MIT_KDC : kdcType);
+
+    kerberosDetails.setKerberosEnvProperties(kerberosEnvProperties);
+
+    return kerberosDetails;
+  }
+
+  /**
    * Creates a temporary directory within the system temporary directory
    * <p/>
    * The resulting directory is to be removed by the caller when desired.
@@ -684,63 +766,74 @@ public class KerberosHelper {
    *                                      records
    * @param identities                    a List of KerberosIdentityDescriptors to add to the data
    *                                      file
-   * @param sch                           the relevant ServiceComponentHost
+   * @param identityFilter                a Collection of identity names indicating the relevant identities -
+   *                                      if null, no filter is relevant; if empty, the filter indicates no
+   *                                      relevant identities
+   * @param hostname                      the relevant hostname
+   * @param serviceName                   the relevant service name
+   * @param componentName                 the relevant component name
    * @param configurations                a Map of configurations to use a replacements for variables
    *                                      in identity fields
    * @return an integer indicating the number of identities added to the data file
    * @throws java.io.IOException if an error occurs while writing a record to the data file
    */
   private int addIdentities(KerberosActionDataFileBuilder kerberosActionDataFileBuilder,
-                            List<KerberosIdentityDescriptor> identities, ServiceComponentHost sch,
-                            Map<String, Map<String, String>> configurations) throws IOException {
+                            Collection<KerberosIdentityDescriptor> identities,
+                            Collection<String> identityFilter, String hostname, String serviceName,
+                            String componentName, Map<String, Map<String, String>> configurations)
+      throws IOException {
     int identitiesAdded = 0;
 
     if (identities != null) {
       for (KerberosIdentityDescriptor identity : identities) {
-        KerberosPrincipalDescriptor principalDescriptor = identity.getPrincipalDescriptor();
-        String principal = null;
-        String principalType = null;
-        String principalConfiguration = null;
-
-        if (principalDescriptor != null) {
-          principal = KerberosDescriptor.replaceVariables(principalDescriptor.getValue(), configurations);
-          principalType = principalDescriptor.getType().name().toLowerCase();
-          principalConfiguration = KerberosDescriptor.replaceVariables(principalDescriptor.getConfiguration(), configurations);
-        }
-
-        if (principal != null) {
-          KerberosKeytabDescriptor keytabDescriptor = identity.getKeytabDescriptor();
-          String keytabFilePath = null;
-          String keytabFileOwnerName = null;
-          String keytabFileOwnerAccess = null;
-          String keytabFileGroupName = null;
-          String keytabFileGroupAccess = null;
-          String keytabFileConfiguration = null;
-
-          if (keytabDescriptor != null) {
-            keytabFilePath = KerberosDescriptor.replaceVariables(keytabDescriptor.getFile(), configurations);
-            keytabFileOwnerName = KerberosDescriptor.replaceVariables(keytabDescriptor.getOwnerName(), configurations);
-            keytabFileOwnerAccess = KerberosDescriptor.replaceVariables(keytabDescriptor.getOwnerAccess(), configurations);
-            keytabFileGroupName = KerberosDescriptor.replaceVariables(keytabDescriptor.getGroupName(), configurations);
-            keytabFileGroupAccess = KerberosDescriptor.replaceVariables(keytabDescriptor.getGroupAccess(), configurations);
-            keytabFileConfiguration = KerberosDescriptor.replaceVariables(keytabDescriptor.getConfiguration(), configurations);
+        // If there is no filter or the filter contains the current identity's name...
+        if ((identityFilter == null) || identityFilter.contains(identity.getName())) {
+          KerberosPrincipalDescriptor principalDescriptor = identity.getPrincipalDescriptor();
+          String principal = null;
+          String principalType = null;
+          String principalConfiguration = null;
+
+          if (principalDescriptor != null) {
+            principal = KerberosDescriptor.replaceVariables(principalDescriptor.getValue(), configurations);
+            principalType = principalDescriptor.getType().name().toLowerCase();
+            principalConfiguration = KerberosDescriptor.replaceVariables(principalDescriptor.getConfiguration(), configurations);
           }
 
-          // Append an entry to the action data file builder...
-          kerberosActionDataFileBuilder.addRecord(sch.getHostName(),
-              sch.getServiceName(),
-              sch.getServiceComponentName(),
-              principal,
-              principalType,
-              principalConfiguration,
-              keytabFilePath,
-              keytabFileOwnerName,
-              keytabFileOwnerAccess,
-              keytabFileGroupName,
-              keytabFileGroupAccess,
-              keytabFileConfiguration);
-
-          identitiesAdded++;
+          if (principal != null) {
+            KerberosKeytabDescriptor keytabDescriptor = identity.getKeytabDescriptor();
+            String keytabFilePath = null;
+            String keytabFileOwnerName = null;
+            String keytabFileOwnerAccess = null;
+            String keytabFileGroupName = null;
+            String keytabFileGroupAccess = null;
+            String keytabFileConfiguration = null;
+
+            if (keytabDescriptor != null) {
+              keytabFilePath = KerberosDescriptor.replaceVariables(keytabDescriptor.getFile(), configurations);
+              keytabFileOwnerName = KerberosDescriptor.replaceVariables(keytabDescriptor.getOwnerName(), configurations);
+              keytabFileOwnerAccess = KerberosDescriptor.replaceVariables(keytabDescriptor.getOwnerAccess(), configurations);
+              keytabFileGroupName = KerberosDescriptor.replaceVariables(keytabDescriptor.getGroupName(), configurations);
+              keytabFileGroupAccess = KerberosDescriptor.replaceVariables(keytabDescriptor.getGroupAccess(), configurations);
+              keytabFileConfiguration = KerberosDescriptor.replaceVariables(keytabDescriptor.getConfiguration(), configurations);
+            }
+
+            // Append an entry to the action data file builder...
+            kerberosActionDataFileBuilder.addRecord(
+                hostname,
+                serviceName,
+                componentName,
+                principal,
+                principalType,
+                principalConfiguration,
+                keytabFilePath,
+                keytabFileOwnerName,
+                keytabFileOwnerAccess,
+                keytabFileGroupName,
+                keytabFileGroupAccess,
+                keytabFileConfiguration);
+
+            identitiesAdded++;
+          }
         }
       }
     }
@@ -753,20 +846,26 @@ public class KerberosHelper {
    *
    * @param authToLocalBuilder the AuthToLocalBuilder to use to build the auth_to_local mapping
    * @param identities         a List of KerberosIdentityDescriptors to process
+   * @param identityFilter     a Collection of identity names indicating the relevant identities -
+   *                           if null, no filter is relevant; if empty, the filter indicates no
+   *                           relevant identities
    * @param configurations     a Map of configurations to use a replacements for variables
    *                           in identity fields
    * @throws org.apache.ambari.server.AmbariException
    */
   private void addIdentities(AuthToLocalBuilder authToLocalBuilder,
-                             List<KerberosIdentityDescriptor> identities,
+                             List<KerberosIdentityDescriptor> identities, Collection<String> identityFilter,
                              Map<String, Map<String, String>> configurations) throws AmbariException {
     if (identities != null) {
       for (KerberosIdentityDescriptor identity : identities) {
-        KerberosPrincipalDescriptor principalDescriptor = identity.getPrincipalDescriptor();
-        if (principalDescriptor != null) {
-          authToLocalBuilder.append(
-              KerberosDescriptor.replaceVariables(principalDescriptor.getValue(), configurations),
-              KerberosDescriptor.replaceVariables(principalDescriptor.getLocalUsername(), configurations));
+        // If there is no filter or the filter contains the current identity's name...
+        if ((identityFilter == null) || identityFilter.contains(identity.getName())) {
+          KerberosPrincipalDescriptor principalDescriptor = identity.getPrincipalDescriptor();
+          if (principalDescriptor != null) {
+            authToLocalBuilder.append(
+                KerberosDescriptor.replaceVariables(principalDescriptor.getValue(), configurations),
+                KerberosDescriptor.replaceVariables(principalDescriptor.getLocalUsername(), configurations));
+          }
         }
       }
     }
@@ -961,7 +1060,7 @@ public class KerberosHelper {
    * Handler is an interface that needs to be implemented by toggle handler classes to do the
    * "right" thing for the task at hand.
    */
-  private interface Handler {
+  private abstract class Handler {
     /**
      * Tests the Service and ServiceComponentHost to see if they are in the appropriate security
      * state to be processed for the relevant task.
@@ -972,29 +1071,32 @@ public class KerberosHelper {
      * state to be processed; otherwise false
      * @throws AmbariException of an error occurs while testing
      */
-    boolean shouldProcess(SecurityState desiredSecurityState, ServiceComponentHost sch) throws AmbariException;
+    abstract boolean shouldProcess(SecurityState desiredSecurityState, ServiceComponentHost sch) throws AmbariException;
 
     /**
      * Returns the new SecurityState to be set as the ServiceComponentHost's _desired_ SecurityState.
      *
-     * @return a SecurityState to be set as the ServiceComponentHost's _desired_ SecurityState
+     * @return a SecurityState to be set as the ServiceComponentHost's _desired_ SecurityState;
+     * or null if no state change is desired
      */
-    SecurityState getNewDesiredSCHSecurityState();
+    abstract SecurityState getNewDesiredSCHSecurityState();
 
     /**
      * Returns the new SecurityState to be set as the ServiceComponentHost's _current_ SecurityState.
      *
-     * @return a SecurityState to be set as the ServiceComponentHost's _current_ SecurityState
+     * @return a SecurityState to be set as the ServiceComponentHost's _current_ SecurityState;
+     * or null if no state change is desired
      */
-    SecurityState getNewSCHSecurityState();
+    abstract SecurityState getNewSCHSecurityState();
 
 
     /**
      * Returns the new SecurityState to be set as the Service's SecurityState.
      *
-     * @return a SecurityState to be set as the Service's SecurityState
+     * @return a SecurityState to be set as the Service's SecurityState;
+     * or null if no state change is desired
      */
-    SecurityState getNewServiceSecurityState();
+    abstract SecurityState getNewServiceSecurityState();
 
     /**
      * Creates the necessary stages to complete the relevant task and stores them in the supplied
@@ -1012,8 +1114,7 @@ public class KerberosHelper {
      * @param hostParams             JSON-encoded host parameters
      * @param event                  a ServiceComponentHostServerActionEvent to pass to any created tasks
      * @param roleCommandOrder       the RoleCommandOrder to use to generate the RoleGraph for any newly created Stages
-     * @param realm                  a String declaring the cluster's Kerberos realm
-     * @param kdcType                a KDCType declaring the type of the relevant KDC
+     * @param kerberosDetails        a KerberosDetails containing the information about the relevant Kerberos configuration
      * @param dataDirectory          a File pointing to the (temporary) data directory
      * @param requestStageContainer  a RequestStageContainer to store the new stages in, if null a
      *                               new RequestStageContainer will be created
@@ -1021,16 +1122,119 @@ public class KerberosHelper {
      * @return the last stage id generated, or -1 if no stages were created
      * @throws AmbariException if an error occurs while creating the relevant stages
      */
-    int createStages(Cluster cluster, Map<String, Host> hosts,
-                     Map<String, Map<String, String>> kerberosConfigurations,
-                     String clusterHostInfo, String hostParams,
-                     ServiceComponentHostServerActionEvent event,
-                     RoleCommandOrder roleCommandOrder,
-                     String realm, KDCType kdcType, File dataDirectory,
-                     RequestStageContainer requestStageContainer,
-                     List<ServiceComponentHost> serviceComponentHosts)
+    abstract long createStages(Cluster cluster, Map<String, Host> hosts,
+                               Map<String, Map<String, String>> kerberosConfigurations,
+                               String clusterHostInfo, String hostParams,
+                               ServiceComponentHostServerActionEvent event,
+                               RoleCommandOrder roleCommandOrder,
+                               KerberosDetails kerberosDetails, File dataDirectory,
+                               RequestStageContainer requestStageContainer,
+                               List<ServiceComponentHost> serviceComponentHosts)
         throws AmbariException;
 
+
+    public void addCreatePrincipalsStage(Cluster cluster, String clusterHostInfoJson,
+                                         String hostParamsJson, ServiceComponentHostServerActionEvent event,
+                                         Map<String, String> commandParameters,
+                                         RoleCommandOrder roleCommandOrder, RequestStageContainer requestStageContainer)
+        throws AmbariException {
+      Stage stage = createServerActionStage(requestStageContainer.getLastStageId(),
+          cluster,
+          requestStageContainer.getId(),
+          "Create Principals",
+          clusterHostInfoJson,
+          "{}",
+          hostParamsJson,
+          CreatePrincipalsServerAction.class,
+          event,
+          commandParameters,
+          "Create Principals",
+          1200);
+
+      RoleGraph roleGraph = new RoleGraph(roleCommandOrder);
+      roleGraph.build(stage);
+      requestStageContainer.addStages(roleGraph.getStages());
+    }
+
+    public void addCreateKeytabFilesStage(Cluster cluster, String clusterHostInfoJson,
+                                          String hostParamsJson, ServiceComponentHostServerActionEvent event,
+                                          Map<String, String> commandParameters,
+                                          RoleCommandOrder roleCommandOrder, RequestStageContainer requestStageContainer)
+        throws AmbariException {
+      Stage stage = createServerActionStage(requestStageContainer.getLastStageId(),
+          cluster,
+          requestStageContainer.getId(),
+          "Create Keytabs",
+          clusterHostInfoJson,
+          "{}",
+          hostParamsJson,
+          CreateKeytabFilesServerAction.class,
+          event,
+          commandParameters,
+          "Create Keytabs",
+          1200);
+
+      RoleGraph roleGraph = new RoleGraph(roleCommandOrder);
+      roleGraph.build(stage);
+      requestStageContainer.addStages(roleGraph.getStages());
+    }
+
+    public void addDistributeKeytabFilesStage(Cluster cluster, List<ServiceComponentHost> serviceComponentHosts,
+                                              String clusterHostInfoJson, String hostParamsJson,
+                                              Map<String, String> commandParameters,
+                                              RoleCommandOrder roleCommandOrder,
+                                              RequestStageContainer requestStageContainer)
+        throws AmbariException {
+      Stage stage = createNewStage(requestStageContainer.getLastStageId(),
+          cluster,
+          requestStageContainer.getId(),
+          "Distribute Keytabs",
+          clusterHostInfoJson,
+          StageUtils.getGson().toJson(commandParameters),
+          hostParamsJson);
+
+      if (!serviceComponentHosts.isEmpty()) {
+        List<String> hostsToUpdate = createUniqueHostList(serviceComponentHosts);
+        Map<String, String> requestParams = new HashMap<String, String>();
+        List<RequestResourceFilter> requestResourceFilters = new ArrayList<RequestResourceFilter>();
+        RequestResourceFilter reqResFilter = new RequestResourceFilter("KERBEROS", "KERBEROS_CLIENT", hostsToUpdate);
+        requestResourceFilters.add(reqResFilter);
+
+        ActionExecutionContext actionExecContext = new ActionExecutionContext(
+            cluster.getClusterName(),
+            "SET_KEYTAB",
+            requestResourceFilters,
+            requestParams);
+        customCommandExecutionHelper.addExecutionCommandsToStage(actionExecContext, stage, requestParams, false);
+      }
+
+      RoleGraph roleGraph = new RoleGraph(roleCommandOrder);
+      roleGraph.build(stage);
+      requestStageContainer.addStages(roleGraph.getStages());
+    }
+
+    public void addUpdateConfigurationsStage(Cluster cluster, String clusterHostInfoJson,
+                                             String hostParamsJson, ServiceComponentHostServerActionEvent event,
+                                             Map<String, String> commandParameters,
+                                             RoleCommandOrder roleCommandOrder, RequestStageContainer requestStageContainer)
+        throws AmbariException {
+      Stage stage = createServerActionStage(requestStageContainer.getLastStageId(),
+          cluster,
+          requestStageContainer.getId(),
+          "Update Configurations",
+          clusterHostInfoJson,
+          "{}",
+          hostParamsJson,
+          UpdateKerberosConfigsServerAction.class,
+          event,
+          commandParameters,
+          "Update Service Configurations",
+          1200);
+
+      RoleGraph roleGraph = new RoleGraph(roleCommandOrder);
+      roleGraph.build(stage);
+      requestStageContainer.addStages(roleGraph.getStages());
+    }
   }
 
   /**
@@ -1047,10 +1251,9 @@ public class KerberosHelper {
    * <li>create keytab files</li>
    * <li>distribute keytab files to the appropriate hosts</li>
    * <li>update relevant configurations</li>
-   * <li>restart services</li>
    * </ol>
    */
-  private class EnableKerberosHandler implements Handler {
+  private class EnableKerberosHandler extends Handler {
     @Override
     public boolean shouldProcess(SecurityState desiredSecurityState, ServiceComponentHost sch) throws AmbariException {
       return (desiredSecurityState == SecurityState.SECURED_KERBEROS) &&
@@ -1075,24 +1278,19 @@ public class KerberosHelper {
     }
 
     @Override
-    public int createStages(Cluster cluster, Map<String, Host> hosts,
-                            Map<String, Map<String, String>> kerberosConfigurations,
-                            String clusterHostInfoJson, String hostParamsJson,
-                            ServiceComponentHostServerActionEvent event,
-                            RoleCommandOrder roleCommandOrder, String realm, KDCType kdcType,
-                            File dataDirectory, RequestStageContainer requestStageContainer,
-                            List<ServiceComponentHost> serviceComponentHosts)
+    public long createStages(Cluster cluster, Map<String, Host> hosts,
+                             Map<String, Map<String, String>> kerberosConfigurations,
+                             String clusterHostInfoJson, String hostParamsJson,
+                             ServiceComponentHostServerActionEvent event,
+                             RoleCommandOrder roleCommandOrder, KerberosDetails kerberosDetails,
+                             File dataDirectory, RequestStageContainer requestStageContainer,
+                             List<ServiceComponentHost> serviceComponentHosts)
         throws AmbariException {
       // If there are principals, keytabs, and configurations to process, setup the following sages:
       //  1) generate principals
       //  2) generate keytab files
       //  3) distribute keytab files
       //  4) update configurations
-      //  4) restart services
-
-      RoleGraph roleGraph;
-      Stage stage;
-      int stageId = -1;
 
       // If a RequestStageContainer does not already exist, create a new one...
       if (requestStageContainer == null) {
@@ -1140,95 +1338,31 @@ public class KerberosHelper {
 
       Map<String, String> commandParameters = new HashMap<String, String>();
       commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
-      commandParameters.put(KerberosServerAction.DEFAULT_REALM, realm);
-      commandParameters.put(KerberosServerAction.KDC_TYPE, kdcType.name());
+      commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
+      commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
       commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
 
       // *****************************************************************
       // Create stage to create principals
-      stage = createServerActionStage(++stageId,
-          cluster,
-          requestStageContainer.getId(),
-          "Create Principals",
-          clusterHostInfoJson,
-          "{}",
-          hostParamsJson,
-          CreatePrincipalsServerAction.class,
-          event,
-          commandParameters,
-          "Create Principals",
-          1200);
-
-      roleGraph = new RoleGraph(roleCommandOrder);
-      roleGraph.build(stage);
-      requestStageContainer.addStages(roleGraph.getStages());
+      addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+          roleCommandOrder, requestStageContainer);
 
       // *****************************************************************
       // Create stage to generate keytabs
-      stage = createServerActionStage(++stageId,
-          cluster,
-          requestStageContainer.getId(),
-          "Create Keytabs",
-          clusterHostInfoJson,
-          "{}",
-          hostParamsJson,
-          CreateKeytabFilesServerAction.class,
-          event,
-          commandParameters,
-          "Create Keytabs",
-          1200);
-
-      roleGraph = new RoleGraph(roleCommandOrder);
-      roleGraph.build(stage);
-      requestStageContainer.addStages(roleGraph.getStages());
+      addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+          roleCommandOrder, requestStageContainer);
 
+      // *****************************************************************
       // Create stage to distribute keytabs
-      stage = createNewStage(++stageId,
-          cluster,
-          requestStageContainer.getId(),
-          "Distribute Keytabs",
-          clusterHostInfoJson,
-          StageUtils.getGson().toJson(commandParameters),
-          hostParamsJson);
-
-      if (!serviceComponentHosts.isEmpty()) {
-        List<String> hostsToUpdate = createUniqueHostList(serviceComponentHosts);
-        Map<String, String> requestParams = new HashMap<String, String>();
-        List<RequestResourceFilter> requestResourceFilters = new ArrayList<RequestResourceFilter>();
-        RequestResourceFilter reqResFilter = new RequestResourceFilter("KERBEROS", "KERBEROS_CLIENT", hostsToUpdate);
-        requestResourceFilters.add(reqResFilter);
-
-        ActionExecutionContext actionExecContext = new ActionExecutionContext(
-            cluster.getClusterName(),
-            "SET_KEYTAB",
-            requestResourceFilters,
-            requestParams);
-        customCommandExecutionHelper.addExecutionCommandsToStage(actionExecContext, stage, requestParams, false);
-      }
-
-      roleGraph = new RoleGraph(roleCommandOrder);
-      roleGraph.build(stage);
-      requestStageContainer.addStages(roleGraph.getStages());
+      addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson, hostParamsJson,
+          commandParameters, roleCommandOrder, requestStageContainer);
 
+      // *****************************************************************
       // Create stage to update configurations of services
-      stage = createServerActionStage(++stageId,
-          cluster,
-          requestStageContainer.getId(),
-          "Update Service Configurations",
-          clusterHostInfoJson,
-          "{}",
-          hostParamsJson,
-          UpdateKerberosConfigsServerAction.class,
-          event,
-          commandParameters,
-          "Update Service Configurations",
-          1200);
-
-      roleGraph = new RoleGraph(roleCommandOrder);
-      roleGraph.build(stage);
-      requestStageContainer.addStages(roleGraph.getStages());
+      addUpdateConfigurationsStage(cluster, clusterHostInfoJson, hostParamsJson, event, commandParameters,
+          roleCommandOrder, requestStageContainer);
 
-      return stageId;
+      return requestStageContainer.getLastStageId();
     }
 
   }
@@ -1249,7 +1383,7 @@ public class KerberosHelper {
    * <li>restart services</li>
    * </ol>
    */
-  private class DisableKerberosHandler implements Handler {
+  private class DisableKerberosHandler extends Handler {
     @Override
     public boolean shouldProcess(SecurityState desiredSecurityState, ServiceComponentHost sch) throws AmbariException {
       return (desiredSecurityState == SecurityState.UNSECURED) &&
@@ -1274,13 +1408,13 @@ public class KerberosHelper {
     }
 
     @Override
-    public int createStages(Cluster cluster, Map<String, Host> hosts,
-                            Map<String, Map<String, String>> kerberosConfigurations,
-                            String clusterHostInfoJson, String hostParamsJson,
-                            ServiceComponentHostServerActionEvent event,
-                            RoleCommandOrder roleCommandOrder, String realm, KDCType kdcType,
-                            File dataDirectory, RequestStageContainer requestStageContainer,
-                            List<ServiceComponentHost> serviceComponentHosts) {
+    public long createStages(Cluster cluster, Map<String, Host> hosts,
+                             Map<String, Map<String, String>> kerberosConfigurations,
+                             String clusterHostInfoJson, String hostParamsJson,
+                             ServiceComponentHostServerActionEvent event,
+                             RoleCommandOrder roleCommandOrder, KerberosDetails kerberosDetails,
+                             File dataDirectory, RequestStageContainer requestStageContainer,
+                             List<ServiceComponentHost> serviceComponentHosts) {
       // TODO (rlevas): If there are principals, keytabs, and configurations to process, setup the following sages:
       //  1) remove principals
       //  2) remove keytab files
@@ -1289,4 +1423,130 @@ public class KerberosHelper {
       return -1;
     }
   }
+
+  /**
+   * CreatePrincipalsAndKeytabsHandler is an implementation of the Handler interface used to create
+   * principals and keytabs and distribute them throughout the cluster.  This is similar to enabling
+   * Kerberos however no states or configurations will be updated.
+   * <p/>
+   * To complete the process, this implementation creates the following stages:
+   * <ol>
+   * <li>create principals</li>
+   * <li>create keytab files</li>
+   * <li>distribute keytab files to the appropriate hosts</li>
+   * </ol>
+   */
+  private class CreatePrincipalsAndKeytabsHandler extends Handler {
+    @Override
+    public boolean shouldProcess(SecurityState desiredSecurityState, ServiceComponentHost sch) throws AmbariException {
+      return (maintenanceStateHelper.getEffectiveState(sch) == MaintenanceState.OFF);
+    }
+
+    @Override
+    public SecurityState getNewDesiredSCHSecurityState() {
+      return null;
+    }
+
+    @Override
+    public SecurityState getNewSCHSecurityState() {
+      return null;
+    }
+
+    @Override
+    public SecurityState getNewServiceSecurityState() {
+      return null;
+    }
+
+    @Override
+    public long createStages(Cluster cluster, Map<String, Host> hosts,
+                             Map<String, Map<String, String>> kerberosConfigurations,
+                             String clusterHostInfoJson, String hostParamsJson,
+                             ServiceComponentHostServerActionEvent event,
+                             RoleCommandOrder roleCommandOrder, KerberosDetails kerberosDetails,
+                             File dataDirectory, RequestStageContainer requestStageContainer,
+                             List<ServiceComponentHost> serviceComponentHosts)
+        throws AmbariException {
+      // If there are principals and keytabs to process, setup the following sages:
+      //  1) generate principals
+      //  2) generate keytab files
+      //  3) distribute keytab files
+
+      // If a RequestStageContainer does not already exist, create a new one...
+      if (requestStageContainer == null) {
+        requestStageContainer = new RequestStageContainer(
+            actionManager.getNextRequestId(),
+            null,
+            requestFactory,
+            actionManager);
+      }
+
+      Map<String, String> commandParameters = new HashMap<String, String>();
+      commandParameters.put(KerberosServerAction.DATA_DIRECTORY, dataDirectory.getAbsolutePath());
+      commandParameters.put(KerberosServerAction.DEFAULT_REALM, kerberosDetails.getDefaultRealm());
+      commandParameters.put(KerberosServerAction.KDC_TYPE, kerberosDetails.getKdcType().name());
+      commandParameters.put(KerberosServerAction.ADMINISTRATOR_CREDENTIAL, getEncryptedAdministratorCredentials(cluster));
+
+      // *****************************************************************
+      // Create stage to create principals
+      super.addCreatePrincipalsStage(cluster, clusterHostInfoJson, hostParamsJson, event,
+          commandParameters, roleCommandOrder, requestStageContainer);
+
+      // *****************************************************************
+      // Create stage to generate keytabs
+      addCreateKeytabFilesStage(cluster, clusterHostInfoJson, hostParamsJson, event,
+          commandParameters, roleCommandOrder, requestStageContainer);
+
+      // Create stage to distribute keytabs
+      addDistributeKeytabFilesStage(cluster, serviceComponentHosts, clusterHostInfoJson,
+          hostParamsJson, commandParameters, roleCommandOrder, requestStageContainer);
+
+      return requestStageContainer.getLastStageId();
+    }
+  }
+
+
+  /**
+   * KerberosDetails is a helper class to hold the details of the relevant Kerberos-specific
+   * configurations so they may be passed around more easily.
+   */
+  private static class KerberosDetails {
+    private boolean securityEnabled;
+    private String defaultRealm;
+    private KDCType kdcType;
+    private Map<String, String> kerberosEnvProperties;
+
+
+    public void setSecurityEnabled(boolean securityEnabled) {
+      this.securityEnabled = securityEnabled;
+    }
+
+    public boolean isSecurityEnabled() {
+      return securityEnabled;
+    }
+
+    public void setDefaultRealm(String defaultRealm) {
+      this.defaultRealm = defaultRealm;
+    }
+
+    public String getDefaultRealm() {
+      return defaultRealm;
+    }
+
+    public void setKdcType(KDCType kdcType) {
+      this.kdcType = kdcType;
+    }
+
+    public KDCType getKdcType() {
+      return kdcType;
+    }
+
+    public void setKerberosEnvProperties(Map<String, String> kerberosEnvProperties) {
+      this.kerberosEnvProperties = kerberosEnvProperties;
+    }
+
+    public Map<String, String> getKerberosEnvProperties() {
+      return kerberosEnvProperties;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestStageContainer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestStageContainer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestStageContainer.java
index 12b7f71..49ba946 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestStageContainer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestStageContainer.java
@@ -25,6 +25,7 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.RequestStatusResponse;
 import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.state.State;
@@ -62,6 +63,8 @@ public class RequestStageContainer {
 
   private String requestContext = null;
 
+  private ExecuteActionRequest actionRequest = null;
+
   /**
    * Logger
    */
@@ -77,10 +80,25 @@ public class RequestStageContainer {
    * @param manager  action manager
    */
   public RequestStageContainer(Long id, List<Stage> stages, RequestFactory factory, ActionManager manager) {
+    this(id, stages, factory, manager, null);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param id            request id
+   * @param stages        stages
+   * @param factory       request factory
+   * @param manager       action manager
+   * @param actionRequest action request
+   */
+  public RequestStageContainer(Long id, List<Stage> stages, RequestFactory factory, ActionManager manager,
+                               ExecuteActionRequest actionRequest) {
     this.id = id;
     this.stages = stages == null ? new ArrayList<Stage>() : stages;
     this.requestFactory = factory;
     this.actionManager = manager;
+    this.actionRequest = actionRequest;
   }
 
   /**
@@ -183,7 +201,9 @@ public class RequestStageContainer {
    */
   public void persist() throws AmbariException {
     if (!stages.isEmpty()) {
-      Request request = requestFactory.createNewFromStages(stages);
+      Request request = (null == actionRequest)
+          ? requestFactory.createNewFromStages(stages)
+          : requestFactory.createNewFromStages(stages, actionRequest);
 
       if (null != requestContext) {
         request.setRequestContext(requestContext);
@@ -193,7 +213,7 @@ public class RequestStageContainer {
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Triggering Action Manager, request=%s", request));
         }
-        actionManager.sendActions(request, null);
+        actionManager.sendActions(request, actionRequest);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/kerberos.json b/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/kerberos.json
new file mode 100644
index 0000000..6ab7610
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/KERBEROS/1.10.3-10/kerberos.json
@@ -0,0 +1,17 @@
+{
+  "services": [
+    {
+      "name": "KERBEROS",
+      "identities": [
+        {
+          "name": "/smokeuser"
+        }
+      ],
+      "components": [
+        {
+          "name": "KERBEROS_CLIENT"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/main/resources/stacks/HDP/2.2/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/kerberos.json b/ambari-server/src/main/resources/stacks/HDP/2.2/kerberos.json
index de5f1a9..46aff38 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/kerberos.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/kerberos.json
@@ -1,6 +1,6 @@
 {
   "properties": {
-    "realm": "${cluster-env/kerberos_domain}",
+    "realm": "${krb5-conf/realm}",
     "keytab_dir": "/etc/security/keytabs"
   },
   "identities": [

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
index 5e933d2..6f2699b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperTest.java
@@ -32,6 +32,7 @@ import junit.framework.Assert;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
+import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -61,7 +62,8 @@ import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 
 @RunWith(MockitoJUnitRunner.class)
-public class AmbariCustomCommandExecutionHelperTest {
+public class
+    AmbariCustomCommandExecutionHelperTest {
   private Injector injector;
   private AmbariManagementController controller;
   private AmbariMetaInfo ambariMetaInfo;
@@ -70,7 +72,7 @@ public class AmbariCustomCommandExecutionHelperTest {
   
   private static final String REQUEST_CONTEXT_PROPERTY = "context";
   
-  @Captor ArgumentCaptor<List<Stage>> stagesCaptor;
+  @Captor ArgumentCaptor<Request> requestCapture;
   @Mock ActionManager am;
   
   @Before
@@ -123,12 +125,13 @@ public class AmbariCustomCommandExecutionHelperTest {
       
       controller.createAction(actionRequest, requestProperties);
       
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
-      
-      
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
-      Stage stage = stages.get(0);
+      Mockito.verify(am, Mockito.times(1)).sendActions(requestCapture.capture(), any(ExecuteActionRequest.class));
+
+      Request request = requestCapture.getValue();
+      Assert.assertNotNull(request);
+      Assert.assertNotNull(request.getStages());
+      Assert.assertEquals(1, request.getStages().size());
+      Stage stage = request.getStages().iterator().next();
       
       Assert.assertEquals(1, stage.getHosts().size());
       
@@ -175,12 +178,14 @@ public class AmbariCustomCommandExecutionHelperTest {
 
       //clusters.getHost("c6402").setState(HostState.HEARTBEAT_LOST);
 
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
+      Mockito.verify(am, Mockito.times(1)).sendActions(requestCapture.capture(), any(ExecuteActionRequest.class));
 
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
+      Request request = requestCapture.getValue();
+      Assert.assertNotNull(request);
+      Assert.assertNotNull(request.getStages());
+      Assert.assertEquals(1, request.getStages().size());
+      Stage stage = request.getStages().iterator().next();
 
-      Stage stage = stages.get(0);
        // Check if was generated command, one for each host
       Assert.assertEquals(2, stage.getHostRoleCommands().size());
     }catch (Exception e) {
@@ -217,12 +222,14 @@ public class AmbariCustomCommandExecutionHelperTest {
 
       controller.createAction(actionRequest, requestProperties);
 
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
+      Mockito.verify(am, Mockito.times(1)).sendActions(requestCapture.capture(), any(ExecuteActionRequest.class));
 
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
+      Request request = requestCapture.getValue();
+      Assert.assertNotNull(request);
+      Assert.assertNotNull(request.getStages());
+      Assert.assertEquals(1, request.getStages().size());
+      Stage stage = request.getStages().iterator().next();
 
-      Stage stage = stages.get(0);
       // Check if was generated command for one health host
       Assert.assertEquals(1, stage.getHostRoleCommands().size());
     }catch (Exception e) {
@@ -260,12 +267,14 @@ public class AmbariCustomCommandExecutionHelperTest {
 
       controller.createAction(actionRequest, requestProperties);
 
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
+      Mockito.verify(am, Mockito.times(1)).sendActions(requestCapture.capture(), any(ExecuteActionRequest.class));
 
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
+      Request request = requestCapture.getValue();
+      Assert.assertNotNull(request);
+      Assert.assertNotNull(request.getStages());
+      Assert.assertEquals(1, request.getStages().size());
+      Stage stage = request.getStages().iterator().next();
 
-      Stage stage = stages.get(0);
       // Check if was generated command for one health host
       Assert.assertEquals(1, stage.getHostRoleCommands().size());
     }catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/339e8a76/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
index 2b00f40..a0e358a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
@@ -31,6 +31,7 @@ import junit.framework.Assert;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
+import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.ExecutionCommand;
@@ -70,7 +71,7 @@ public class BackgroundCustomCommandExecutionTest {
   
   private static final String REQUEST_CONTEXT_PROPERTY = "context";
   
-  @Captor ArgumentCaptor<List<Stage>> stagesCaptor;
+  @Captor ArgumentCaptor<Request> requestCapture;
   @Mock ActionManager am;
   
   @Before
@@ -123,12 +124,13 @@ public class BackgroundCustomCommandExecutionTest {
       
       controller.createAction(actionRequest, requestProperties);
       
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
-      
-      
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
-      Stage stage = stages.get(0);
+      Mockito.verify(am, Mockito.times(1)).sendActions(requestCapture.capture(), any(ExecuteActionRequest.class));
+
+      Request request = requestCapture.getValue();
+      Assert.assertNotNull(request);
+      Assert.assertNotNull(request.getStages());
+      Assert.assertEquals(1, request.getStages().size());
+      Stage stage = request.getStages().iterator().next();
       
       System.out.println(stage);