You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2018/12/10 21:51:55 UTC

[ambari] branch trunk updated: AMBARI-25007. Process Kerberos descriptor for Add Service request (#2705)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aedbcb7  AMBARI-25007. Process Kerberos descriptor for Add Service request (#2705)
aedbcb7 is described below

commit aedbcb7100588d7a78ae04241a0b61ee64f6fe3b
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Mon Dec 10 22:51:50 2018 +0100

    AMBARI-25007. Process Kerberos descriptor for Add Service request (#2705)
---
 .../api/query/render/ClusterBlueprintRenderer.java |   9 +-
 .../internal/ArtifactResourceProvider.java         |   7 +
 .../server/state/kerberos/KerberosDescriptor.java  |   9 +-
 .../state/kerberos/KerberosServiceDescriptor.java  |   3 +-
 .../ambari/server/topology/BlueprintImpl.java      |   3 +-
 .../server/topology/SecurityConfiguration.java     |  68 +++++--
 .../topology/SecurityConfigurationFactory.java     |  26 +--
 .../ambari/server/topology/TopologyManager.java    |  21 +--
 .../server/topology/addservice/AddServiceInfo.java |  20 +-
 .../addservice/AddServiceOrchestrator.java         |  53 +++++-
 .../topology/addservice/RequestValidator.java      | 132 ++++++++-----
 .../addservice/ResourceProviderAdapter.java        | 130 ++++++++++---
 .../ambari/server/utils/LoggingPreconditions.java  |  70 +++++++
 .../server/controller/AddServiceRequestTest.java   |  17 +-
 .../internal/BlueprintResourceProviderTest.java    |   3 +-
 .../internal/ClusterResourceProviderTest.java      |   7 +-
 .../ambari/server/topology/BlueprintImplTest.java  |   2 +-
 .../topology/addservice/RequestValidatorTest.java  | 207 +++++++++++++++++++--
 .../addservice/StackAdvisorAdapterTest.java        |   2 +-
 .../test/resources/add_service_api/request1.json   |   3 +
 20 files changed, 631 insertions(+), 161 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/query/render/ClusterBlueprintRenderer.java b/ambari-server/src/main/java/org/apache/ambari/server/api/query/render/ClusterBlueprintRenderer.java
index 9139cec..7a31b01 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/query/render/ClusterBlueprintRenderer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/query/render/ClusterBlueprintRenderer.java
@@ -55,7 +55,6 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
-import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.topology.AmbariContext;
@@ -68,6 +67,7 @@ import org.apache.ambari.server.topology.HostGroupInfo;
 import org.apache.ambari.server.topology.InvalidTopologyException;
 import org.apache.ambari.server.topology.InvalidTopologyTemplateException;
 import org.apache.ambari.server.topology.SecurityConfigurationFactory;
+import org.apache.ambari.server.topology.addservice.ResourceProviderAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -331,11 +331,8 @@ public class ClusterBlueprintRenderer extends BaseRenderer implements Renderer {
     return blueprintSetting;
   }
 
-  private Map<String, Object> getKerberosDescriptor(ClusterController clusterController, String clusterName) throws AmbariException {
-    PredicateBuilder pb = new PredicateBuilder();
-    Predicate predicate = pb.begin().property("Artifacts/cluster_name").equals(clusterName).and().
-      property(ArtifactResourceProvider.ARTIFACT_NAME_PROPERTY).equals("kerberos_descriptor").
-      end().toPredicate();
+  private static Map<String, Object> getKerberosDescriptor(ClusterController clusterController, String clusterName) throws AmbariException {
+    Predicate predicate = ResourceProviderAdapter.predicateForKerberosDescriptorArtifact(clusterName);
 
     ResourceProvider artifactProvider =
        clusterController.ensureResourceProvider(Resource.Type.Artifact);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ArtifactResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ArtifactResourceProvider.java
index 2aca1cd..0c71b13 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ArtifactResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ArtifactResourceProvider.java
@@ -75,6 +75,9 @@ public class ArtifactResourceProvider extends AbstractResourceProvider {
   public static final String CLUSTER_NAME_PROPERTY = RESPONSE_KEY + PropertyHelper.EXTERNAL_PATH_SEP + CLUSTER_NAME;
   public static final String SERVICE_NAME_PROPERTY = RESPONSE_KEY + PropertyHelper.EXTERNAL_PATH_SEP + SERVICE_NAME;
 
+  // artifact names
+  public static final String KERBEROS_DESCRIPTOR = "kerberos_descriptor";
+
   /**
    * primary key fields
    */
@@ -547,6 +550,10 @@ public class ArtifactResourceProvider extends AbstractResourceProvider {
         requestProps.iterator().next().get(ARTIFACT_NAME_PROPERTY) != null;
   }
 
+  public static String toArtifactDataJson(Map<?,?> properties) {
+    return String.format("{ \"%s\": %s }", ARTIFACT_DATA_PROPERTY, jsonSerializer.toJson(properties));
+  }
+
   //todo: when static registration is changed to external registration, this interface
   //todo: should be extracted as a first class interface.
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosDescriptor.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosDescriptor.java
index b2b9a60..d03fa93 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosDescriptor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosDescriptor.java
@@ -90,8 +90,8 @@ import org.apache.commons.lang.StringUtils;
  */
 public class KerberosDescriptor extends AbstractKerberosDescriptorContainer {
 
-  static final String KEY_PROPERTIES = "properties";
-  static final String KEY_SERVICES = Type.SERVICE.getDescriptorPluralName();
+  public static final String KEY_PROPERTIES = "properties";
+  public static final String KEY_SERVICES = Type.SERVICE.getDescriptorPluralName();
 
   /**
    * A Map of the "global" properties contained within this KerberosDescriptor
@@ -271,8 +271,9 @@ public class KerberosDescriptor extends AbstractKerberosDescriptorContainer {
    * Properties will be updated if the relevant updated values are not null.
    *
    * @param updates the KerberosDescriptor containing the updated values
+   * @return this {@code KerberosDescriptor} for convenience
    */
-  public void update(KerberosDescriptor updates) {
+  public KerberosDescriptor update(KerberosDescriptor updates) {
     if (updates != null) {
       Map<String, KerberosServiceDescriptor> updatedServiceDescriptors = updates.getServices();
       if (updatedServiceDescriptors != null) {
@@ -290,6 +291,8 @@ public class KerberosDescriptor extends AbstractKerberosDescriptorContainer {
     }
 
     super.update(updates);
+
+    return this;
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosServiceDescriptor.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosServiceDescriptor.java
index 51b7cd0..a993cff 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosServiceDescriptor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/KerberosServiceDescriptor.java
@@ -86,6 +86,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
  */
 public class KerberosServiceDescriptor extends AbstractKerberosDescriptorContainer {
 
+  public static final String KEY_NAME = "name";
   static final String KEY_PRECONFIGURE = "preconfigure";
   static final String KEY_COMPONENTS = Type.COMPONENT.getDescriptorPluralName();
 
@@ -111,7 +112,7 @@ public class KerberosServiceDescriptor extends AbstractKerberosDescriptorContain
   KerberosServiceDescriptor(Map<?, ?> data) {
     // The name for this KerberosServiceDescriptor is stored in the "name" entry in the map
     // This is not automatically set by the super classes.
-    this(getStringValue(data, "name"), data);
+    this(getStringValue(data, KEY_NAME), data);
   }
 
   @Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintImpl.java
index ff82961..b18e9a6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintImpl.java
@@ -64,8 +64,7 @@ public class BlueprintImpl implements Blueprint {
   public BlueprintImpl(BlueprintEntity entity) throws NoSuchStackException {
     this.name = entity.getBlueprintName();
     if (entity.getSecurityType() != null) {
-      this.security = new SecurityConfiguration(entity.getSecurityType(), entity.getSecurityDescriptorReference(),
-        null);
+      this.security = SecurityConfiguration.of(entity.getSecurityType(), entity.getSecurityDescriptorReference(), null);
     }
 
     parseStack(entity.getStack());
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfiguration.java
index 47cd117..7eca293 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfiguration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfiguration.java
@@ -22,13 +22,18 @@ import static org.apache.ambari.server.topology.SecurityConfigurationFactory.KER
 import static org.apache.ambari.server.topology.SecurityConfigurationFactory.KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID;
 import static org.apache.ambari.server.topology.SecurityConfigurationFactory.TYPE_PROPERTY_ID;
 
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
+import org.apache.ambari.annotations.ApiIgnore;
 import org.apache.ambari.server.state.SecurityType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -42,36 +47,61 @@ import io.swagger.annotations.ApiModelProperty;
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
 public class SecurityConfiguration {
 
-  public static final SecurityConfiguration NONE = new SecurityConfiguration(SecurityType.NONE);
+  public static final SecurityConfiguration NONE = new SecurityConfiguration(SecurityType.NONE, null, null);
+  public static final SecurityConfiguration KERBEROS = new SecurityConfiguration(SecurityType.KERBEROS, null, null);
 
   /**
    * Security Type
    */
-  private SecurityType type;
+  private final SecurityType type;
 
   /**
    * Holds a reference to a kerberos_descriptor resource.
    */
-  private String descriptorReference;
+  private final String descriptorReference;
 
   /**
-   * Content of a kerberos_descriptor as String.
+   * Content of a kerberos_descriptor as Map.
    */
-  private String descriptor;
+  private final Map<?,?> descriptor;
 
-  public SecurityConfiguration(SecurityType type) {
-    this.type = type;
+  public static SecurityConfiguration of(SecurityType type, String reference, Map<?,?> descriptorMap) {
+    if (type == SecurityType.NONE) {
+      return NONE;
+    }
+    if (type != SecurityType.KERBEROS) {
+      throw new IllegalArgumentException("Unexpected SecurityType: " + type);
+    }
+    if (reference == null && descriptorMap == null) {
+      return KERBEROS;
+    }
+    if (reference != null && descriptorMap != null) {
+      throw new IllegalArgumentException("Cannot set both descriptor and reference");
+    }
+    return reference != null ? withReference(reference) : withDescriptor(descriptorMap);
+  }
+
+  public static SecurityConfiguration withReference(String reference) {
+    return new SecurityConfiguration(SecurityType.KERBEROS, reference, null);
+  }
+
+  public static SecurityConfiguration withDescriptor(Map<?, ?> descriptorMap) {
+    return new SecurityConfiguration(SecurityType.KERBEROS, null, descriptorMap);
+  }
+
+  public static SecurityConfiguration forTest(SecurityType type, String reference, Map<?, ?> descriptorMap) {
+    return new SecurityConfiguration(type, reference, descriptorMap);
   }
 
   @JsonCreator
-  public SecurityConfiguration(
+  SecurityConfiguration(
     @JsonProperty(TYPE_PROPERTY_ID) SecurityType type,
     @JsonProperty(KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID) String descriptorReference,
-    @JsonProperty(KERBEROS_DESCRIPTOR_PROPERTY_ID) String descriptor
+    @JsonProperty(KERBEROS_DESCRIPTOR_PROPERTY_ID) Map<?, ?> descriptorMap
   ) {
     this.type = type;
     this.descriptorReference = descriptorReference;
-    this.descriptor = descriptor;
+    this.descriptor = descriptorMap != null ? ImmutableMap.copyOf(descriptorMap) : null;
   }
 
   @JsonProperty(TYPE_PROPERTY_ID)
@@ -80,14 +110,20 @@ public class SecurityConfiguration {
     return type;
   }
 
-  @JsonProperty(KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID)
-  @ApiModelProperty(name = KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID)
-  public String getDescriptor() {
-    return descriptor;
-  }
-
   @JsonProperty(KERBEROS_DESCRIPTOR_PROPERTY_ID)
   @ApiModelProperty(name = KERBEROS_DESCRIPTOR_PROPERTY_ID)
+  public Map<?,?> _getDescriptor() {
+    return getDescriptor().isPresent() ? descriptor : ImmutableMap.of();
+  }
+
+  @ApiIgnore
+  @JsonIgnore
+  public Optional<Map<?,?>> getDescriptor() {
+    return Optional.ofNullable(descriptor);
+  }
+
+  @JsonProperty(KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID)
+  @ApiModelProperty(name = KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID)
   public String getDescriptorReference() {
     return descriptorReference;
   }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfigurationFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfigurationFactory.java
index 9eb2d37..62e3244 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfigurationFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/SecurityConfigurationFactory.java
@@ -64,19 +64,18 @@ public class SecurityConfigurationFactory {
    *
    * @param properties Security properties from Json parsed into a Map
    * @param persistEmbeddedDescriptor whether to save embedded descriptor or not
-   * @return
    */
   public SecurityConfiguration createSecurityConfigurationFromRequest(Map<String, Object> properties, boolean
     persistEmbeddedDescriptor) {
 
-    SecurityConfiguration securityConfiguration = null;
+    SecurityConfiguration securityConfiguration;
 
     LOGGER.debug("Creating security configuration from properties: {}", properties);
-    Map<String, Object> securityProperties = (Map<String, Object>) properties.get(SECURITY_PROPERTY_ID);
+    Map<?, ?> securityProperties = (Map<?, ?>) properties.get(SECURITY_PROPERTY_ID);
 
     if (securityProperties == null) {
       LOGGER.debug("No security information properties provided, returning null");
-      return securityConfiguration;
+      return null;
     }
 
     String securityTypeString = Strings.emptyToNull((String) securityProperties.get(TYPE_PROPERTY_ID));
@@ -105,31 +104,32 @@ public class SecurityConfigurationFactory {
             + KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID + " at the same time, is not allowed.");
       }
 
-      String descriptorText = null;
-
       if (descriptorJsonMap != null) { // this means the reference is null
         LOGGER.debug("Found embedded descriptor: {}", descriptorJsonMap);
-        descriptorText = jsonSerializer.<Map<String, Object>>toJson(descriptorJsonMap, Map.class);
+        String descriptorText = jsonSerializer.toJson(descriptorJsonMap, Map.class);
         if (persistEmbeddedDescriptor) {
           descriptorReference = persistKerberosDescriptor(descriptorText);
         }
-        securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, descriptorReference, descriptorText);
+        Map<?, ?> descriptorMap = (Map<?, ?>) descriptorJsonMap;
+        securityConfiguration = persistEmbeddedDescriptor
+          ? SecurityConfiguration.withReference(descriptorReference)
+          : SecurityConfiguration.withDescriptor(descriptorMap);
       } else if (descriptorReference != null) { // this means the reference is not null
         LOGGER.debug("Found descriptor reference: {}", descriptorReference);
         securityConfiguration = loadSecurityConfigurationByReference(descriptorReference);
       } else {
         LOGGER.debug("There is no security descriptor found in the request");
-        securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS);
+        securityConfiguration = SecurityConfiguration.KERBEROS;
       }
     } else {
       LOGGER.debug("There is no security configuration found in the request");
-      securityConfiguration = new SecurityConfiguration(SecurityType.NONE);
+      securityConfiguration = SecurityConfiguration.NONE;
     }
     return securityConfiguration;
   }
 
   public SecurityConfiguration loadSecurityConfigurationByReference(String reference) {
-    SecurityConfiguration securityConfiguration = null;
+    SecurityConfiguration securityConfiguration;
     LOGGER.debug("Loading security configuration by reference: {}", reference);
 
     if (reference == null) {
@@ -144,7 +144,9 @@ public class SecurityConfigurationFactory {
       throw new IllegalArgumentException("No security configuration found for the reference: " + reference);
     }
 
-    securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, reference, descriptorEntity.getKerberosDescriptorText());
+    String descriptorText = descriptorEntity.getKerberosDescriptorText();
+    Map<String, ?> descriptorMap = jsonSerializer.<Map<String, ?>>fromJson(descriptorText, Map.class);
+    securityConfiguration =  SecurityConfiguration.withDescriptor(descriptorMap);
 
     return securityConfiguration;
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 5c030b6..f3fa58a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -313,8 +313,8 @@ public class TopologyManager {
     // create resources
     ambariContext.createAmbariResources(topology, clusterName, securityType, repoVersion, repoVersionID);
 
-    if (securityConfiguration != null && securityConfiguration.getDescriptor() != null) {
-      submitKerberosDescriptorAsArtifact(clusterName, securityConfiguration.getDescriptor());
+    if (securityConfiguration != null) {
+      securityConfiguration.getDescriptor().ifPresent(descriptor -> submitKerberosDescriptorAsArtifact(clusterName, descriptor));
     }
 
     if (credential != null) {
@@ -440,21 +440,16 @@ public class TopologyManager {
     return securityConfiguration;
   }
 
-  private void submitKerberosDescriptorAsArtifact(String clusterName, String descriptor) {
+  private void submitKerberosDescriptorAsArtifact(String clusterName, Map<?,?> descriptor) {
 
     ResourceProvider artifactProvider =
         ambariContext.getClusterController().ensureResourceProvider(Resource.Type.Artifact);
 
-    Map<String, Object> properties = new HashMap<>();
-    properties.put(ArtifactResourceProvider.ARTIFACT_NAME_PROPERTY, "kerberos_descriptor");
-    properties.put("Artifacts/cluster_name", clusterName);
-
-    Map<String, String> requestInfoProps = new HashMap<>();
-    requestInfoProps.put(org.apache.ambari.server.controller.spi.Request.REQUEST_INFO_BODY_PROPERTY,
-            "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}");
-
-    org.apache.ambari.server.controller.spi.Request request = new RequestImpl(Collections.emptySet(),
-        Collections.singleton(properties), requestInfoProps, null);
+    Map<String, Object> properties = ResourceProviderAdapter.createKerberosDescriptorRequestProperties(clusterName);
+    Map<String, String> requestInfoProps = ImmutableMap.of(
+      Request.REQUEST_INFO_BODY_PROPERTY, ArtifactResourceProvider.toArtifactDataJson(descriptor)
+    );
+    Request request = new RequestImpl(Collections.emptySet(), Collections.singleton(properties), requestInfoProps, null);
 
     try {
       RequestStatus status = artifactProvider.createResources(request);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
index f08ad65..2df7c4d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
@@ -20,11 +20,13 @@ package org.apache.ambari.server.topology.addservice;
 import static java.util.stream.Collectors.joining;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.ambari.server.controller.AddServiceRequest;
 import org.apache.ambari.server.controller.internal.RequestStageContainer;
 import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.topology.Configuration;
 
 /**
@@ -35,21 +37,31 @@ public final class AddServiceInfo {
   private final AddServiceRequest request;
   private final String clusterName;
   private final Stack stack;
+  private final KerberosDescriptor kerberosDescriptor;
   private final Map<String, Map<String, Set<String>>> newServices;
   private final RequestStageContainer stages;
   private final Configuration config;
 
-  public AddServiceInfo(AddServiceRequest request, String clusterName, Stack stack, Configuration config, RequestStageContainer stages, Map<String, Map<String, Set<String>>> newServices) {
+  public AddServiceInfo(
+    AddServiceRequest request,
+    String clusterName,
+    Stack stack,
+    Configuration config,
+    KerberosDescriptor kerberosDescriptor,
+    RequestStageContainer stages,
+    Map<String, Map<String, Set<String>>> newServices
+  ) {
     this.request = request;
     this.clusterName = clusterName;
     this.stack = stack;
+    this.kerberosDescriptor = kerberosDescriptor;
     this.newServices = newServices;
     this.stages = stages;
     this.config = config;
   }
 
   public AddServiceInfo withNewServices(Map<String, Map<String, Set<String>>> services) {
-    return new AddServiceInfo(request, clusterName, stack, config, stages, services);
+    return new AddServiceInfo(request, clusterName, stack, config, kerberosDescriptor, stages, services);
   }
 
   @Override
@@ -85,6 +97,10 @@ public final class AddServiceInfo {
     return config;
   }
 
+  public Optional<KerberosDescriptor> getKerberosDescriptor() {
+    return Optional.ofNullable(kerberosDescriptor);
+  }
+
   /**
    * Creates a descriptive label to be displayed in the UI.
    */
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
index 556e0fa..71cce4a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.topology.addservice;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import javax.inject.Inject;
@@ -31,12 +32,16 @@ import org.apache.ambari.server.controller.AddServiceRequest;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.serveraction.kerberos.KerberosAdminAuthenticationException;
 import org.apache.ambari.server.serveraction.kerberos.KerberosInvalidConfigurationException;
+import org.apache.ambari.server.serveraction.kerberos.KerberosMissingAdminCredentialsException;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.topology.Configuration;
+import org.apache.ambari.server.utils.LoggingPreconditions;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +53,7 @@ import com.google.common.collect.Sets;
 public class AddServiceOrchestrator {
 
   private static final Logger LOG = LoggerFactory.getLogger(AddServiceOrchestrator.class);
+  private static final LoggingPreconditions CHECK = new LoggingPreconditions(LOG);
 
   @Inject
   private ResourceProviderAdapter resourceProviders;
@@ -71,6 +77,7 @@ public class AddServiceOrchestrator {
     LOG.info("Received {} request for {}: {}", request.getOperationType(), cluster.getClusterName(), request);
 
     AddServiceInfo validatedRequest = validate(cluster, request);
+    ensureCredentials(cluster, validatedRequest);
     AddServiceInfo requestWithLayout = recommendLayout(validatedRequest);
     AddServiceInfo requestWithConfig = recommendConfiguration(requestWithLayout);
 
@@ -96,6 +103,25 @@ public class AddServiceOrchestrator {
   }
 
   /**
+   * Stores any credentials provided in the request, and
+   * validates KDC credentials if the cluster has Kerberos enabled.
+   * The goal is to make sure that no resources (services, components, etc.) get created
+   * (except the credentials) if the request as a whole would fail due to missing credentials.
+   */
+  private void ensureCredentials(Cluster cluster, AddServiceInfo validatedRequest) {
+    resourceProviders.createCredentials(validatedRequest);
+    if (cluster.getSecurityType() == SecurityType.KERBEROS) {
+      try {
+        controller.getKerberosHelper().validateKDCCredentials(cluster);
+      } catch (KerberosMissingAdminCredentialsException | KerberosAdminAuthenticationException | KerberosInvalidConfigurationException e) {
+        CHECK.wrapInUnchecked(e, IllegalArgumentException::new, "KDC credentials validation failed: %s", e);
+      } catch (AmbariException e) {
+        CHECK.wrapInUnchecked(e, IllegalStateException::new, "Error occurred while validating KDC credentials: %s", e);
+      }
+    }
+  }
+
+  /**
    * Requests layout recommendation from the stack advisor.
    * @return new request, updated based on the recommended layout
    * @throws IllegalArgumentException if the request cannot be satisfied
@@ -124,7 +150,7 @@ public class AddServiceOrchestrator {
 
     Set<String> existingServices = cluster.getServices().keySet();
 
-    resourceProviders.createCredentials(request);
+    updateKerberosDescriptor(request);
 
     resourceProviders.createServices(request);
     resourceProviders.createComponents(request);
@@ -159,8 +185,7 @@ public class AddServiceOrchestrator {
           )
         );
       } catch (AmbariException | KerberosInvalidConfigurationException e) {
-        LOG.error("Error configuring Kerberos: {}", e, e);
-        throw new RuntimeException(e);
+        CHECK.wrapInUnchecked(e, RuntimeException::new, "Error configuring Kerberos for %s: %s", request, e);
       }
     }
   }
@@ -176,12 +201,22 @@ public class AddServiceOrchestrator {
     try {
       request.getStages().persist();
     } catch (AmbariException e) {
-      String msg = String.format("Error creating host tasks for %s", request);
-      LOG.error(msg, e);
-      throw new IllegalStateException(msg, e);
+      CHECK.wrapInUnchecked(e, IllegalStateException::new, "Error creating host tasks for %s", request);
     }
   }
 
+  private void updateKerberosDescriptor(AddServiceInfo request) {
+    request.getKerberosDescriptor().ifPresent(descriptorInRequest -> {
+      Optional<KerberosDescriptor> existingDescriptor = resourceProviders.getKerberosDescriptor(request);
+      if (existingDescriptor.isPresent()) {
+        KerberosDescriptor newDescriptor = existingDescriptor.get().update(descriptorInRequest);
+        resourceProviders.updateKerberosDescriptor(request, newDescriptor);
+      } else {
+        resourceProviders.createKerberosDescriptor(request, descriptorInRequest);
+      }
+    });
+  }
+
   private static Map<String, String> createComponentHostMap(Cluster cluster) {
     return StageUtils.createComponentHostMap(
       cluster.getServices().keySet(),
@@ -194,8 +229,7 @@ public class AddServiceOrchestrator {
     try {
       return cluster.getService(service).getServiceComponent(component).getServiceComponentsHosts();
     } catch (AmbariException e) {
-      LOG.error("Error getting components of service {}: {}", service, e, e);
-      throw new RuntimeException(e);
+      return CHECK.wrapInUnchecked(e, IllegalStateException::new, "Error getting hosts for service %s component %: %s", service, component, e, e);
     }
   }
 
@@ -203,8 +237,7 @@ public class AddServiceOrchestrator {
     try {
       return cluster.getService(service).getServiceComponents().keySet();
     } catch (AmbariException e) {
-      LOG.error("Error getting components of service {}: {}", service, e, e);
-      throw new RuntimeException(e);
+      return CHECK.wrapInUnchecked(e, IllegalStateException::new, "Error getting components of service %s: %s", service, e, e);
     }
   }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java
index 8b72114..04dc084 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java
@@ -28,7 +28,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -41,9 +40,14 @@ import org.apache.ambari.server.controller.internal.RequestStageContainer;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptorFactory;
 import org.apache.ambari.server.topology.Configuration;
+import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.apache.ambari.server.topology.StackFactory;
+import org.apache.ambari.server.utils.LoggingPreconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +62,7 @@ import com.google.inject.assistedinject.Assisted;
 public class RequestValidator {
 
   private static final Logger LOG = LoggerFactory.getLogger(RequestValidator.class);
+  private static final LoggingPreconditions CHECK = new LoggingPreconditions(LOG);
 
   private static final Set<String> NOT_ALLOWED_CONFIG_TYPES = ImmutableSet.of("kerberos-env", "krb5-conf");
 
@@ -66,7 +71,9 @@ public class RequestValidator {
   private final AmbariManagementController controller;
   private final ConfigHelper configHelper;
   private final StackFactory stackFactory;
+  private final KerberosDescriptorFactory kerberosDescriptorFactory;
   private final AtomicBoolean serviceInfoCreated = new AtomicBoolean();
+  private final SecurityConfigurationFactory securityConfigurationFactory;
 
   private State state;
 
@@ -74,7 +81,8 @@ public class RequestValidator {
   public RequestValidator(
     @Assisted AddServiceRequest request, @Assisted Cluster cluster,
     AmbariManagementController controller, ConfigHelper configHelper,
-    StackFactory stackFactory
+    StackFactory stackFactory, KerberosDescriptorFactory kerberosDescriptorFactory,
+    SecurityConfigurationFactory securityConfigurationFactory
   ) {
     this.state = State.INITIAL;
     this.request = request;
@@ -82,15 +90,17 @@ public class RequestValidator {
     this.controller = controller;
     this.configHelper = configHelper;
     this.stackFactory = stackFactory;
+    this.kerberosDescriptorFactory = kerberosDescriptorFactory;
+    this.securityConfigurationFactory = securityConfigurationFactory;
   }
 
   /**
    * Perform validation of the request.
    */
   void validate() {
-    validateSecurity();
     validateStack();
     validateServicesAndComponents();
+    validateSecurity();
     validateHosts();
     validateConfiguration();
   }
@@ -101,11 +111,13 @@ public class RequestValidator {
   AddServiceInfo createValidServiceInfo(ActionManager actionManager, RequestFactory requestFactory) {
     final State state = this.state;
 
-    checkState(state.isValid(), "The request needs to be validated first");
-    checkState(!serviceInfoCreated.getAndSet(true), "Can create only one instance for each validated add service request");
+    CHECK.checkState(state.isValid(), "The request needs to be validated first");
+    CHECK.checkState(!serviceInfoCreated.getAndSet(true), "Can create only one instance for each validated add service request");
 
     RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager);
-    AddServiceInfo validatedRequest = new AddServiceInfo(request, cluster.getClusterName(), state.getStack(), state.getConfig(), stages, state.getNewServices());
+    AddServiceInfo validatedRequest = new AddServiceInfo(request, cluster.getClusterName(),
+      state.getStack(), state.getConfig(), state.getKerberosDescriptor(),
+      stages, state.getNewServices());
     stages.setRequestContext(validatedRequest.describe());
     return validatedRequest;
   }
@@ -122,12 +134,52 @@ public class RequestValidator {
 
   @VisibleForTesting
   void validateSecurity() {
-    request.getSecurity().ifPresent(requestSecurity ->
-      checkArgument(requestSecurity.getType() == cluster.getSecurityType(),
+    request.getSecurity().ifPresent(requestSecurity -> {
+      CHECK.checkArgument(requestSecurity.getType() == cluster.getSecurityType(),
         "Security type in the request (%s), if specified, should match cluster's security type (%s)",
         requestSecurity.getType(), cluster.getSecurityType()
-      )
-    );
+      );
+
+      boolean hasDescriptor = requestSecurity.getDescriptor().isPresent();
+      boolean hasDescriptorReference = requestSecurity.getDescriptorReference() != null;
+      boolean secureCluster = cluster.getSecurityType() == SecurityType.KERBEROS;
+
+      CHECK.checkArgument(secureCluster || !hasDescriptor,
+        "Kerberos descriptor cannot be set for security type %s", cluster.getSecurityType());
+      CHECK.checkArgument(secureCluster || !hasDescriptorReference,
+        "Kerberos descriptor reference cannot be set for security type %s", cluster.getSecurityType());
+      CHECK.checkArgument(!hasDescriptor || !hasDescriptorReference,
+        "Kerberos descriptor and reference cannot be both set");
+
+      Optional<Map<?,?>> kerberosDescriptor = hasDescriptor
+        ? requestSecurity.getDescriptor()
+        : hasDescriptorReference
+          ? loadKerberosDescriptor(requestSecurity.getDescriptorReference())
+          : Optional.empty();
+
+      kerberosDescriptor.ifPresent(descriptorMap -> {
+        CHECK.checkState(state.getNewServices() != null,
+          "Services need to be validated before security settings");
+
+        KerberosDescriptor descriptor = kerberosDescriptorFactory.createInstance(descriptorMap);
+
+        Set<String> servicesWithNewDescriptor = descriptor.getServices().keySet();
+        Set<String> newServices = state.getNewServices().keySet();
+        Set<String> nonNewServices = ImmutableSet.copyOf(Sets.difference(servicesWithNewDescriptor, newServices));
+
+        CHECK.checkArgument(nonNewServices.isEmpty(),
+          "Kerberos descriptor should be provided only for new services, but found other services: %s",
+          nonNewServices);
+
+        try {
+          descriptor.toMap();
+        } catch (Exception e) {
+          CHECK.wrapInUnchecked(e, IllegalArgumentException::new, "Error validating Kerberos descriptor: %s", e);
+        }
+
+        state = state.with(descriptor);
+      });
+    });
   }
 
   @VisibleForTesting
@@ -138,9 +190,8 @@ public class RequestValidator {
       Stack stack = stackFactory.createStack(stackId.getStackName(), stackId.getStackVersion(), controller);
       state = state.with(stack);
     } catch (AmbariException e) {
-      logAndThrow(requestStackId.isPresent()
-        ? msg -> new IllegalArgumentException(msg, e)
-        : IllegalStateException::new,
+      CHECK.wrapInUnchecked(e,
+        requestStackId.isPresent() ? IllegalArgumentException::new : IllegalStateException::new,
         "Stack %s not found", stackId
       );
     }
@@ -157,9 +208,9 @@ public class RequestValidator {
     for (AddServiceRequest.Service service : request.getServices()) {
       String serviceName = service.getName();
 
-      checkArgument(stack.getServices().contains(serviceName),
+      CHECK.checkArgument(stack.getServices().contains(serviceName),
         "Unknown service %s in %s", service, stack);
-      checkArgument(!existingServices.contains(serviceName),
+      CHECK.checkArgument(!existingServices.contains(serviceName),
         "Service %s already exists in cluster %s", serviceName, cluster.getClusterName());
 
       newServices.computeIfAbsent(serviceName, __ -> new HashMap<>());
@@ -170,9 +221,9 @@ public class RequestValidator {
       String componentName = requestedComponent.getName();
       String serviceName = stack.getServiceForComponent(componentName);
 
-      checkArgument(serviceName != null,
+      CHECK.checkArgument(serviceName != null,
         "No service found for component %s in %s", componentName, stack);
-      checkArgument(!existingServices.contains(serviceName),
+      CHECK.checkArgument(!existingServices.contains(serviceName),
         "Service %s (for component %s) already exists in cluster %s", serviceName, componentName, cluster.getClusterName());
 
       newServices.computeIfAbsent(serviceName, __ -> new HashMap<>())
@@ -180,7 +231,7 @@ public class RequestValidator {
         .add(requestedComponent.getFqdn());
     }
 
-    checkArgument(!newServices.isEmpty(), "Request should have at least one new service or component to be added");
+    CHECK.checkArgument(!newServices.isEmpty(), "Request should have at least one new service or component to be added");
 
     state = state.withNewServices(newServices);
   }
@@ -190,7 +241,7 @@ public class RequestValidator {
     Configuration config = request.getConfiguration();
 
     for (String type : NOT_ALLOWED_CONFIG_TYPES) {
-      checkArgument(!config.getProperties().containsKey(type), "Cannot change '%s' configuration in Add Service request", type);
+      CHECK.checkArgument(!config.getProperties().containsKey(type), "Cannot change '%s' configuration in Add Service request", type);
     }
 
     Configuration clusterConfig = getClusterDesiredConfigs();
@@ -209,7 +260,7 @@ public class RequestValidator {
       .collect(toSet());
     Set<String> unknownHosts = new TreeSet<>(Sets.difference(requestHosts, clusterHosts));
 
-    checkArgument(unknownHosts.isEmpty(),
+    CHECK.checkArgument(unknownHosts.isEmpty(),
       "Requested host not associated with cluster %s: %s", cluster.getClusterName(), unknownHosts);
   }
 
@@ -217,42 +268,29 @@ public class RequestValidator {
     try {
       return Configuration.of(configHelper.calculateExistingConfigs(cluster));
     } catch (AmbariException e) {
-      logAndThrow(msg -> new IllegalStateException(msg, e), "Error getting effective configuration of cluster %s", cluster.getClusterName());
-      return Configuration.newEmpty(); // unreachable
-    }
-  }
-
-  private static void checkArgument(boolean expression, String errorMessage, Object... messageParams) {
-    if (!expression) {
-      logAndThrow(IllegalArgumentException::new, errorMessage, messageParams);
+      return CHECK.wrapInUnchecked(e, IllegalStateException::new, "Error getting effective configuration of cluster %s", cluster.getClusterName());
     }
   }
 
-  private static void checkState(boolean expression, String errorMessage, Object... messageParams) {
-    if (!expression) {
-      logAndThrow(IllegalStateException::new, errorMessage, messageParams);
-    }
-  }
-
-  private static void logAndThrow(Function<String, RuntimeException> exceptionCreator, String errorMessage, Object... messageParams) {
-    String msg = String.format(errorMessage, messageParams);
-    LOG.error(msg);
-    throw exceptionCreator.apply(msg);
+  private Optional<Map<?,?>> loadKerberosDescriptor(String descriptorReference) {
+    return securityConfigurationFactory.loadSecurityConfigurationByReference(descriptorReference).getDescriptor();
   }
 
   @VisibleForTesting
   static class State {
 
-    static final State INITIAL = new State(null, null, null);
+    static final State INITIAL = new State(null, null, null, null);
 
     private final Stack stack;
     private final Map<String, Map<String, Set<String>>> newServices;
     private final Configuration config;
+    private final KerberosDescriptor kerberosDescriptor;
 
-    State(Stack stack, Map<String, Map<String, Set<String>>> newServices, Configuration config) {
+    State(Stack stack, Map<String, Map<String, Set<String>>> newServices, Configuration config, KerberosDescriptor kerberosDescriptor) {
       this.stack = stack;
       this.newServices = newServices;
       this.config = config;
+      this.kerberosDescriptor = kerberosDescriptor;
     }
 
     boolean isValid() {
@@ -260,15 +298,19 @@ public class RequestValidator {
     }
 
     State with(Stack stack) {
-      return new State(stack, newServices, config);
+      return new State(stack, newServices, config, kerberosDescriptor);
     }
 
     State withNewServices(Map<String, Map<String, Set<String>>> newServices) {
-      return new State(stack, newServices, config);
+      return new State(stack, newServices, config, kerberosDescriptor);
     }
 
     State with(Configuration config) {
-      return new State(stack, newServices, config);
+      return new State(stack, newServices, config, kerberosDescriptor);
+    }
+
+    State with(KerberosDescriptor kerberosDescriptor) {
+      return new State(stack, newServices, config, kerberosDescriptor);
     }
 
     Stack getStack() {
@@ -282,6 +324,10 @@ public class RequestValidator {
     Configuration getConfig() {
       return config;
     }
+
+    KerberosDescriptor getKerberosDescriptor() {
+      return kerberosDescriptor;
+    }
   }
 
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
index 3e23643..a7fe1f0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
@@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
 import org.apache.ambari.server.controller.ClusterRequest;
 import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
 import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
 import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
 import org.apache.ambari.server.controller.internal.CredentialResourceProvider;
@@ -60,11 +61,15 @@ import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
 import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.security.authorization.AuthorizationException;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptorFactory;
 import org.apache.ambari.server.topology.Credential;
+import org.apache.ambari.server.utils.LoggingPreconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,10 +85,15 @@ import com.google.common.collect.Sets;
 public class ResourceProviderAdapter {
 
   private static final Logger LOG = LoggerFactory.getLogger(ResourceProviderAdapter.class);
+  private static final LoggingPreconditions CHECK = new LoggingPreconditions(LOG);
+
+  private final KerberosDescriptorFactory descriptorFactory = new KerberosDescriptorFactory();
 
   @Inject
   private AmbariManagementController controller;
 
+  // business methods
+
   public void createServices(AddServiceInfo request) {
     LOG.info("Creating service resources for {}", request);
 
@@ -91,7 +101,7 @@ public class ResourceProviderAdapter {
       .map(service -> createServiceRequestProperties(request, service))
       .collect(toSet());
 
-    createResources(request, properties, Resource.Type.Service, false);
+    createResources(request, Resource.Type.Service, properties, null, false);
   }
 
   public void createComponents(AddServiceInfo request) {
@@ -102,7 +112,7 @@ public class ResourceProviderAdapter {
         .map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component)))
       .collect(toSet());
 
-    createResources(request, properties, Resource.Type.Component, false);
+    createResources(request, Resource.Type.Component, properties, null, false);
   }
 
   public void createHostComponents(AddServiceInfo request) {
@@ -114,7 +124,7 @@ public class ResourceProviderAdapter {
           .map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host))))
       .collect(toSet());
 
-    createResources(request, properties, Resource.Type.HostComponent, false);
+    createResources(request, Resource.Type.HostComponent, properties, null, false);
   }
 
   public void createConfigs(AddServiceInfo request) {
@@ -131,11 +141,48 @@ public class ResourceProviderAdapter {
         .peek(credential -> LOG.debug("Creating credential {}", credential))
         .map(credential -> createCredentialRequestProperties(request.clusterName(), credential))
         .forEach(
-          properties -> createResources(request, ImmutableSet.of(properties), Resource.Type.Credential, true)
+          properties -> createResources(request, Resource.Type.Credential, ImmutableSet.of(properties), null, true)
         );
     }
   }
 
+  /**
+   * @return {@code Optional} with the cluster's kerberos descriptor artifact if it exists, otherwise empty {@code Optional}
+   */
+  public Optional<KerberosDescriptor> getKerberosDescriptor(AddServiceInfo request) {
+    Set<String> propertyIds = ImmutableSet.of(ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY);
+    Predicate predicate = predicateForKerberosDescriptorArtifact(request.clusterName());
+
+    Set<Resource> resources = getResources(request, propertyIds, Resource.Type.Artifact, predicate);
+
+    if (resources == null || resources.isEmpty()) {
+      return Optional.empty();
+    }
+
+    CHECK.checkArgument(resources.size() == 1,
+      "Expected only one artifact of type %s, but got %d",
+      ArtifactResourceProvider.KERBEROS_DESCRIPTOR,
+      resources.size()
+    );
+
+    return Optional.of(descriptorFactory.createInstance(resources.iterator().next().getPropertiesMap().get(ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY)));
+  }
+
+  public void createKerberosDescriptor(AddServiceInfo request, KerberosDescriptor descriptor) {
+    LOG.info("Creating Kerberos descriptor for {}", request);
+    Map<String, Object> properties = createKerberosDescriptorRequestProperties(request.clusterName());
+    Map<String, String> requestInfo = requestInfoForKerberosDescriptor(descriptor);
+    createResources(request, Resource.Type.Artifact, ImmutableSet.of(properties), requestInfo, false);
+  }
+
+  public void updateKerberosDescriptor(AddServiceInfo request, KerberosDescriptor descriptor) {
+    LOG.info("Updating Kerberos descriptor from {}", request);
+    Map<String, Object> properties = createKerberosDescriptorRequestProperties(request.clusterName());
+    Map<String, String> requestInfo = requestInfoForKerberosDescriptor(descriptor);
+    Predicate predicate = predicateForKerberosDescriptorArtifact(request.clusterName());
+    updateResources(request, ImmutableSet.of(properties), Resource.Type.Artifact, predicate, requestInfo);
+  }
+
   public void updateExistingConfigs(AddServiceInfo request, Set<String> existingServices) {
     LOG.info("Updating existing configurations for {}", request);
     Set<ClusterRequest> requests = createConfigRequestsForExistingServices(request, existingServices);
@@ -148,7 +195,9 @@ public class ResourceProviderAdapter {
     Set<Map<String, Object>> properties = ImmutableSet.of(ImmutableMap.of(
       ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, desiredState.name()
     ));
-    updateResources(request, properties, Resource.Type.Service, predicateForNewServices(request, "ServiceInfo"));
+    Map<String, String> requestInfo = createRequestInfo(request.clusterName(), Resource.Type.Service).build();
+    Predicate predicate = predicateForNewServices(request, "ServiceInfo");
+    updateResources(request, properties, Resource.Type.Service, predicate, requestInfo);
   }
 
   public void updateHostComponentDesiredState(AddServiceInfo request, State desiredState) {
@@ -163,18 +212,30 @@ public class ResourceProviderAdapter {
     addProvisionProperties(requestInfo, desiredState, request.getRequest().getProvisionAction());
 
     HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
-    Request internalRequest = createRequest(properties, requestInfo.build());
+    Request internalRequest = createRequest(properties, requestInfo.build(), null);
     try {
       rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false);
     } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
-      String msg = String.format("Error updating host component desired state for %s", request);
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
+      CHECK.wrapInUnchecked(e, RuntimeException::new, "Error updating host component desired state for %s", request);
     }
   }
 
-  private static void createResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, boolean okIfExists) {
-    Request internalRequest = createRequest(properties, null);
+  // ResourceProvider calls
+
+  private static Set<Resource> getResources(AddServiceInfo request, Set<String> propertyIds, Resource.Type resourceType, Predicate predicate) {
+    Request internalRequest = createRequest(null, null, propertyIds);
+    ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
+    try {
+      return rp.getResources(internalRequest, predicate);
+    } catch (NoSuchResourceException e) {
+      return ImmutableSet.of();
+    } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException e) {
+      return CHECK.wrapInUnchecked(e, RuntimeException::new, "Error getting resources %s for %s", resourceType, request);
+    }
+  }
+
+  private static void createResources(AddServiceInfo request, Resource.Type resourceType, Set<Map<String, Object>> properties, Map<String, String> requestInfo, boolean okIfExists) {
+    Request internalRequest = createRequest(properties, requestInfo, null);
     ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
     try {
       rp.createResources(internalRequest);
@@ -182,22 +243,18 @@ public class ResourceProviderAdapter {
       if (okIfExists && e instanceof ResourceAlreadyExistsException) {
         LOG.info("Resource already exists: {}, no need to create", e.getMessage());
       } else {
-        String msg = String.format("Error creating resources %s for %s", resourceType, request);
-        LOG.error(msg, e);
-        throw new RuntimeException(msg, e);
+        CHECK.wrapInUnchecked(e, RuntimeException::new, "Error creating resources %s for %s", resourceType, request);
       }
     }
   }
 
-  private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) {
-    Request internalRequest = createRequest(properties, createRequestInfo(request.clusterName(), resourceType).build());
+  private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate, Map<String, String> requestInfo) {
+    Request internalRequest = createRequest(properties, requestInfo, null);
     ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
     try {
       rp.updateResources(internalRequest, predicate);
     } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
-      String msg = String.format("Error updating resources %s for %s", resourceType, request);
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
+      CHECK.wrapInUnchecked(e, RuntimeException::new, "Error updating resources %s for %s", resourceType, request);
     }
   }
 
@@ -205,14 +262,12 @@ public class ResourceProviderAdapter {
     try {
       controller.updateClusters(requests, null);
     } catch (AmbariException | AuthorizationException e) {
-      String msg = String.format(errorMessageFormat, addServiceRequest);
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
+      CHECK.wrapInUnchecked(e, RuntimeException::new, errorMessageFormat, addServiceRequest);
     }
   }
 
-  private static Request createRequest(Set<Map<String, Object>> properties, Map<String, String> requestInfoProperties) {
-    return new RequestImpl(null, properties, requestInfoProperties, null);
+  private static Request createRequest(Set<Map<String, Object>> properties, Map<String, String> requestInfoProperties, Set<String> propertyIds) {
+    return new RequestImpl(propertyIds, properties, requestInfoProperties, null);
   }
 
   private static ImmutableMap.Builder<String, String> createRequestInfo(String clusterName, Resource.Type resourceType) {
@@ -229,6 +284,12 @@ public class ResourceProviderAdapter {
     }
   }
 
+  public static Map<String, String> requestInfoForKerberosDescriptor(KerberosDescriptor descriptor) {
+    return ImmutableMap.of(Request.REQUEST_INFO_BODY_PROPERTY, ArtifactResourceProvider.toArtifactDataJson(descriptor.toMap()));
+  }
+
+  // request creation (as map of properties)
+
   private static Map<String, Object> createServiceRequestProperties(AddServiceInfo request, String service) {
     ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
 
@@ -274,6 +335,15 @@ public class ResourceProviderAdapter {
     return properties.build();
   }
 
+  public static Map<String, Object> createKerberosDescriptorRequestProperties(String clusterName) {
+    return ImmutableMap.of(
+      ArtifactResourceProvider.CLUSTER_NAME_PROPERTY, clusterName,
+      ArtifactResourceProvider.ARTIFACT_NAME_PROPERTY, ArtifactResourceProvider.KERBEROS_DESCRIPTOR
+    );
+  }
+
+  // ClusterRequest creation (for configuration)
+
   private static Set<ClusterRequest> createConfigRequestsForNewServices(AddServiceInfo request) {
     Map<String, Map<String, String>> fullProperties = request.getConfig().getFullProperties();
     Map<String, Map<String, Map<String, String>>> fullAttributes = request.getConfig().getFullAttributes();
@@ -358,6 +428,16 @@ public class ResourceProviderAdapter {
     return Optional.of(clusterRequest);
   }
 
+  // Predicate creation
+
+  public static Predicate predicateForKerberosDescriptorArtifact(String clusterName) {
+    return new PredicateBuilder().begin()
+      .property(ArtifactResourceProvider.CLUSTER_NAME_PROPERTY).equals(clusterName)
+      .and()
+      .property(ArtifactResourceProvider.ARTIFACT_NAME_PROPERTY).equals(ArtifactResourceProvider.KERBEROS_DESCRIPTOR)
+      .end().toPredicate();
+  }
+
   private static Predicate predicateForNewServices(AddServiceInfo request, String category) {
     return new AndPredicate(
       new EqualsPredicate<>(PropertyHelper.getPropertyId(category, ClusterResourceProvider.CLUSTER_NAME), request.clusterName()),
@@ -369,7 +449,9 @@ public class ResourceProviderAdapter {
     );
   }
 
+  // TODO should be injected
   private static ClusterController getClusterController() {
     return ClusterControllerHelper.getClusterController();
   }
+
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/LoggingPreconditions.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoggingPreconditions.java
new file mode 100644
index 0000000..2a0d43b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/LoggingPreconditions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+
+public class LoggingPreconditions {
+
+  private final Logger logger;
+
+  public LoggingPreconditions(Logger logger) {
+    this.logger = logger;
+  }
+
+  public void checkNotNull(Object o, String errorMessage, Object... messageParams) {
+    if (o == null) {
+      logAndThrow(NullPointerException::new, errorMessage, messageParams);
+    }
+  }
+
+  public void checkArgument(boolean expression, String errorMessage, Object... messageParams) {
+    if (!expression) {
+      logAndThrow(IllegalArgumentException::new, errorMessage, messageParams);
+    }
+  }
+
+  public void checkState(boolean expression, String errorMessage, Object... messageParams) {
+    if (!expression) {
+      logAndThrow(IllegalStateException::new, errorMessage, messageParams);
+    }
+  }
+
+  /**
+   * Wraps {@code exception} in an unchecked exception created by {@code uncheckedWrapper}, and always throws the unchecked exception.
+   * @return null to make the compiler happy
+   */
+  public <T> T wrapInUnchecked(Exception exception, BiFunction<String, Exception, RuntimeException> uncheckedWrapper, String errorMessage, Object... messageParams) {
+    logAndThrow(msg -> uncheckedWrapper.apply(msg, exception), errorMessage, messageParams);
+    return null; // unreachable
+  }
+
+  /**
+   * Formats an error message with parameters using {@code String.format}, logs it using {@code logger},
+   * and throws an unchecked exception with the same message created by {@code exceptionCreator}.
+   */
+  public void logAndThrow(Function<String, RuntimeException> exceptionCreator, String errorMessage, Object... messageParams) {
+    String msg = String.format(errorMessage, messageParams);
+    logger.error(msg);
+    throw exceptionCreator.apply(msg);
+  }
+
+}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java
index d3b6aa0..c0f4afe 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
 import org.apache.ambari.server.controller.internal.ProvisionAction;
 import org.apache.ambari.server.security.encryption.CredentialStoreType;
 import org.apache.ambari.server.state.SecurityType;
@@ -52,6 +53,7 @@ import org.apache.ambari.server.topology.Configurable;
 import org.apache.ambari.server.topology.Configuration;
 import org.apache.ambari.server.topology.Credential;
 import org.apache.ambari.server.topology.SecurityConfiguration;
+import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -75,6 +77,8 @@ public class AddServiceRequestTest {
   private static String REQUEST_INVALID_NO_SERVICES_AND_COMPONENTS;
   private static String REQUEST_INVALID_INVALID_FIELD;
   private static String REQUEST_INVALID_INVALID_CONFIG;
+  private static final Map<String, List<Map<String, String>>> KERBEROS_DESCRIPTOR1 =
+    ImmutableMap.of("services", ImmutableList.of(ImmutableMap.of("name", "ZOOKEEPER")));
 
   private ObjectMapper mapper = new ObjectMapper();
 
@@ -116,7 +120,7 @@ public class AddServiceRequestTest {
       request.getServices());
 
     assertEquals(
-      Optional.of(new SecurityConfiguration(SecurityType.KERBEROS, "ref_to_kerb_desc", null)),
+      Optional.of(SecurityConfiguration.forTest(SecurityType.KERBEROS, "ref_to_kerb_desc", KERBEROS_DESCRIPTOR1)),
       request.getSecurity());
 
     assertEquals(
@@ -250,6 +254,17 @@ public class AddServiceRequestTest {
       ),
       serialized.get(Configurable.CONFIGURATIONS)
     );
+
+    assertEquals(
+      ImmutableMap.of(
+        SecurityConfigurationFactory.TYPE_PROPERTY_ID, SecurityType.KERBEROS.name(),
+        SecurityConfigurationFactory.KERBEROS_DESCRIPTOR_PROPERTY_ID, KERBEROS_DESCRIPTOR1,
+        SecurityConfigurationFactory.KERBEROS_DESCRIPTOR_REFERENCE_PROPERTY_ID, "ref_to_kerb_desc"
+      ),
+      serialized.get(ClusterResourceProvider.SECURITY)
+    );
+
+    assertNull(serialized.get(ClusterResourceProvider.CREDENTIALS));
   }
 
   @Test
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintResourceProviderTest.java
index 7b15412..92f91b8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintResourceProviderTest.java
@@ -73,7 +73,6 @@ import org.apache.ambari.server.orm.entities.HostGroupEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
 import org.apache.ambari.server.state.PropertyInfo;
-import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.topology.Blueprint;
 import org.apache.ambari.server.topology.BlueprintFactory;
@@ -384,7 +383,7 @@ public class BlueprintResourceProviderTest {
     Set<Map<String, Object>> setProperties = getBlueprintTestProperties();
     Map<String, String> requestInfoProperties = getTestRequestInfoProperties();
     Map<String, Set<HashMap<String, String>>> settingProperties = getSettingProperties();
-    SecurityConfiguration securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, "testRef", null);
+    SecurityConfiguration securityConfiguration = SecurityConfiguration.withReference("testRef");
 
     // set expectations
     expect(securityFactory.createSecurityConfigurationFromRequest(EasyMock.anyObject(), anyBoolean())).andReturn
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
index e687983..bdc289b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
@@ -177,9 +177,8 @@ public class ClusterResourceProviderTest {
     Map<String, String> requestInfoProperties = new HashMap<>();
     requestInfoProperties.put(Request.REQUEST_INFO_BODY_PROPERTY, "{\"security\" : {\n\"type\" : \"NONE\"," +
       "\n\"kerberos_descriptor_reference\" : " + "\"testRef\"\n}}");
-    SecurityConfiguration blueprintSecurityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, "testRef",
-      null);
-    SecurityConfiguration securityConfiguration = new SecurityConfiguration(SecurityType.NONE, null, null);
+    SecurityConfiguration blueprintSecurityConfiguration = SecurityConfiguration.withReference("testRef");
+    SecurityConfiguration securityConfiguration = SecurityConfiguration.NONE;
 
     // set expectations
     expect(request.getProperties()).andReturn(requestProperties).anyTimes();
@@ -201,7 +200,7 @@ public class ClusterResourceProviderTest {
   public void testCreateResource_blueprint_withSecurityConfiguration() throws Exception {
     Set<Map<String, Object>> requestProperties = createBlueprintRequestProperties(CLUSTER_NAME, BLUEPRINT_NAME);
     Map<String, Object> properties = requestProperties.iterator().next();
-    SecurityConfiguration securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, "testRef", null);
+    SecurityConfiguration securityConfiguration = SecurityConfiguration.withReference("testRef");
 
     Map<String, String> requestInfoProperties = new HashMap<>();
     requestInfoProperties.put(Request.REQUEST_INFO_BODY_PROPERTY, "{\"security\" : {\n\"type\" : \"KERBEROS\",\n\"kerberos_descriptor_reference\" : " +
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintImplTest.java
index d34526a..a513caf 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintImplTest.java
@@ -123,7 +123,7 @@ public class BlueprintImplTest {
     properties.put("category2", category2Props);
     category2Props.put("prop2", "val");
 
-    SecurityConfiguration securityConfiguration = new SecurityConfiguration(SecurityType.KERBEROS, "testRef", null);
+    SecurityConfiguration securityConfiguration = SecurityConfiguration.withReference("testRef");
     Blueprint blueprint = new BlueprintImpl("test", hostGroups, stack, configuration, securityConfiguration);
     blueprint.validateRequiredProperties();
     BlueprintEntity entity = blueprint.toEntity();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java
index 9b313e9..6b285dc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.ambari.server.topology.addservice;
 
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -24,9 +27,11 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
@@ -39,8 +44,12 @@ import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptorFactory;
+import org.apache.ambari.server.state.kerberos.KerberosServiceDescriptor;
 import org.apache.ambari.server.topology.Configuration;
 import org.apache.ambari.server.topology.SecurityConfiguration;
+import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.apache.ambari.server.topology.StackFactory;
 import org.easymock.EasyMockSupport;
 import org.junit.After;
@@ -53,20 +62,27 @@ import com.google.common.collect.Sets;
 
 public class RequestValidatorTest extends EasyMockSupport {
 
+  private static final Map<String, ?> FAKE_DESCRIPTOR = ImmutableMap.of("kerberos", "descriptor");
+  private static final String FAKE_DESCRIPTOR_REFERENCE = "ref";
+
   private final AddServiceRequest request = createNiceMock(AddServiceRequest.class);
   private final Cluster cluster = createMock(Cluster.class);
   private final AmbariManagementController controller = createNiceMock(AmbariManagementController.class);
   private final ConfigHelper configHelper = createMock(ConfigHelper.class);
   private final StackFactory stackFactory = createNiceMock(StackFactory.class);
-  private final RequestValidator validator = new RequestValidator(request, cluster, controller, configHelper, stackFactory);
+  private final KerberosDescriptorFactory kerberosDescriptorFactory = createNiceMock(KerberosDescriptorFactory.class);
+  private final SecurityConfigurationFactory securityConfigurationFactory = createStrictMock(SecurityConfigurationFactory.class);
+  private final RequestValidator validator = new RequestValidator(request, cluster, controller, configHelper, stackFactory, kerberosDescriptorFactory, securityConfigurationFactory);
 
   @Before
   public void setUp() {
     validator.setState(RequestValidator.State.INITIAL);
     expect(cluster.getClusterName()).andReturn("TEST").anyTimes();
     expect(cluster.getServices()).andStubReturn(ImmutableMap.of());
+    expect(cluster.getSecurityType()).andStubReturn(SecurityType.NONE);
     expect(request.getServices()).andStubReturn(ImmutableSet.of());
     expect(request.getComponents()).andStubReturn(ImmutableSet.of());
+    expect(request.getSecurity()).andStubReturn(Optional.empty());
   }
 
   @After
@@ -176,21 +192,22 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void acceptsKnownServices() {
-    expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of("KAFKA")));
+    String newService = "KAFKA";
+    requestServices(false, newService);
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
     validator.validateServicesAndComponents();
 
     Map<String, Map<String, Set<String>>> expectedNewServices = ImmutableMap.of(
-      "KAFKA", ImmutableMap.of()
+      newService, ImmutableMap.of()
     );
     assertEquals(expectedNewServices, validator.getState().getNewServices());
   }
 
   @Test
   public void acceptsKnownComponents() {
-    expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of("KAFKA_BROKER", "c7401.ambari.apache.org")));
+    requestComponents("KAFKA_BROKER");
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
@@ -205,7 +222,7 @@ public class RequestValidatorTest extends EasyMockSupport {
   @Test
   public void rejectsUnknownService() {
     String serviceName = "UNKNOWN_SERVICE";
-    expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of(serviceName)));
+    requestServices(false, serviceName);
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
@@ -217,7 +234,7 @@ public class RequestValidatorTest extends EasyMockSupport {
   @Test
   public void rejectsUnknownComponent() {
     String componentName = "UNKNOWN_COMPONENT";
-    expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of(componentName, "c7401.ambari.apache.org")));
+    requestComponents(componentName);
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
@@ -229,8 +246,8 @@ public class RequestValidatorTest extends EasyMockSupport {
   @Test
   public void rejectsExistingServiceForService() {
     String serviceName = "KAFKA";
-    expect(cluster.getServices()).andReturn(ImmutableMap.of(serviceName, createNiceMock(Service.class))).anyTimes();
-    expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of(serviceName)));
+    requestServices(false, serviceName);
+    clusterAlreadyHasServices(serviceName);
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
@@ -239,12 +256,16 @@ public class RequestValidatorTest extends EasyMockSupport {
     assertNull(validator.getState().getNewServices());
   }
 
+  private void clusterAlreadyHasServices(String serviceName) {
+    expect(cluster.getServices()).andReturn(ImmutableMap.of(serviceName, createNiceMock(Service.class))).anyTimes();
+  }
+
   @Test
   public void rejectsExistingServiceForComponent() {
     String serviceName = "KAFKA";
     String componentName = "KAFKA_BROKER";
-    expect(cluster.getServices()).andReturn(ImmutableMap.of(serviceName, createNiceMock(Service.class))).anyTimes();
-    expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of(componentName, "c7401.ambari.apache.org")));
+    clusterAlreadyHasServices(serviceName);
+    requestComponents(componentName);
     validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack()));
     replayAll();
 
@@ -293,8 +314,7 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void acceptsAbsentSecurityWhenClusterHasKerberos() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes();
-    expect(request.getSecurity()).andReturn(Optional.empty()).anyTimes();
+    secureCluster();
     replayAll();
 
     validator.validateSecurity();
@@ -302,8 +322,6 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void acceptsAbsentSecurityWhenClusterHasNone() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes();
-    expect(request.getSecurity()).andReturn(Optional.empty()).anyTimes();
     replayAll();
 
     validator.validateSecurity();
@@ -311,8 +329,8 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void acceptsMatchingKerberosSecurity() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes();
-    expect(request.getSecurity()).andReturn(Optional.of(new SecurityConfiguration(SecurityType.KERBEROS))).anyTimes();
+    secureCluster();
+    requestSpecifiesSecurity();
     replayAll();
 
     validator.validateSecurity();
@@ -320,7 +338,6 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void acceptsMatchingNoneSecurity() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes();
     expect(request.getSecurity()).andReturn(Optional.of(SecurityConfiguration.NONE)).anyTimes();
     replayAll();
 
@@ -329,7 +346,7 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void rejectsNoneSecurityWhenClusterHasKerberos() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes();
+    secureCluster();
     expect(request.getSecurity()).andReturn(Optional.of(SecurityConfiguration.NONE)).anyTimes();
     replayAll();
 
@@ -339,8 +356,7 @@ public class RequestValidatorTest extends EasyMockSupport {
 
   @Test
   public void rejectsKerberosSecurityWhenClusterHasNone() {
-    expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes();
-    expect(request.getSecurity()).andReturn(Optional.of(new SecurityConfiguration(SecurityType.KERBEROS))).anyTimes();
+    requestSpecifiesSecurity();
     replayAll();
 
     assertThrows(IllegalArgumentException.class, validator::validateSecurity);
@@ -349,6 +365,94 @@ public class RequestValidatorTest extends EasyMockSupport {
   }
 
   @Test
+  public void rejectsKerberosDescriptorForNoSecurity() {
+    SecurityConfiguration requestSecurity = SecurityConfiguration.forTest(SecurityType.NONE, null, ImmutableMap.of("kerberos", "descriptor"));
+    expect(request.getSecurity()).andReturn(Optional.of(requestSecurity)).anyTimes();
+    replayAll();
+
+    assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    assertTrue(e.getMessage().contains("Kerberos descriptor"));
+  }
+
+  @Test
+  public void rejectsKerberosDescriptorReferenceForNoSecurity() {
+    SecurityConfiguration requestSecurity = SecurityConfiguration.forTest(SecurityType.NONE, "ref", null);
+    expect(request.getSecurity()).andReturn(Optional.of(requestSecurity)).anyTimes();
+    replayAll();
+
+    assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    assertTrue(e.getMessage().contains("Kerberos descriptor reference"));
+  }
+
+  @Test
+  public void rejectsRequestWithBothKerberosDescriptorAndReference() {
+    secureCluster();
+    SecurityConfiguration invalidConfig = SecurityConfiguration.forTest(SecurityType.KERBEROS, "ref", ImmutableMap.of());
+    expect(request.getSecurity()).andReturn(Optional.of(invalidConfig)).anyTimes();
+    replayAll();
+
+    assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    assertTrue(e.getMessage().contains("Kerberos descriptor and reference"));
+  }
+
+  @Test
+  public void loadsKerberosDescriptorByReference() {
+    String newService = "KAFKA";
+    secureCluster();
+    requestServices(true, newService);
+    KerberosDescriptor kerberosDescriptor = requestHasKerberosDescriptorFor(true, newService);
+    replayAll();
+
+    validator.validateSecurity();
+
+    assertEquals(kerberosDescriptor, validator.getState().getKerberosDescriptor());
+    verifyAll();
+  }
+
+  @Test
+  public void reportsDanglingKerberosDescriptorReference() {
+    String newService = "KAFKA";
+    secureCluster();
+    requestServices(true, newService);
+    SecurityConfiguration requestSecurity = SecurityConfiguration.withReference(FAKE_DESCRIPTOR_REFERENCE);
+    expect(request.getSecurity()).andReturn(Optional.of(requestSecurity)).anyTimes();
+    expect(securityConfigurationFactory.loadSecurityConfigurationByReference(FAKE_DESCRIPTOR_REFERENCE))
+      .andThrow(new IllegalArgumentException("No security configuration found for the reference: " + FAKE_DESCRIPTOR_REFERENCE));
+    replayAll();
+
+    assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    verifyAll();
+  }
+
+  @Test
+  public void acceptsDescriptorWithOnlyNewServices() {
+    String newService = "KAFKA";
+    secureCluster();
+    requestServices(true, newService);
+    KerberosDescriptor kerberosDescriptor = requestHasKerberosDescriptorFor(false, newService);
+    replayAll();
+
+    validator.validateSecurity();
+
+    assertEquals(kerberosDescriptor, validator.getState().getKerberosDescriptor());
+  }
+
+  @Test
+  public void rejectsDescriptorWithAdditionalServices() {
+    String newService = "KAFKA", otherService = "ZOOKEEPER";
+    secureCluster();
+    requestServices(true, newService);
+    requestHasKerberosDescriptorFor(false, newService, otherService);
+    replayAll();
+
+    IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity);
+    assertTrue(e.getMessage().contains("only for new services"));
+  }
+
+  @Test
   public void combinesRequestConfigWithClusterAndStack() throws AmbariException {
     Configuration requestConfig = Configuration.newEmpty();
     requestConfig.setProperty("kafka-broker", "zookeeper.connect", "zookeeper.connect:request");
@@ -437,6 +541,69 @@ public class RequestValidatorTest extends EasyMockSupport {
     );
   }
 
+  private void requestServices(boolean validated, String... services) {
+    expect(request.getServices()).andReturn(
+      Arrays.stream(services)
+        .map(AddServiceRequest.Service::of)
+        .collect(toSet())
+    ).anyTimes();
+    if (validated) {
+      validatedServices(services);
+    }
+  }
+
+  private void validatedServices(String... services) {
+    validator.setState(
+      RequestValidator.State.INITIAL
+        .with(simpleMockStack())
+        .withNewServices(
+          Arrays.stream(services)
+            .collect(toMap(Function.identity(), __ -> ImmutableMap.of()))
+        )
+    );
+  }
+
+  private void requestComponents(String... components) {
+    expect(request.getComponents()).andReturn(
+      Arrays.stream(components)
+        .map(componentName -> AddServiceRequest.Component.of(componentName, "c7401.ambari.apache.org"))
+        .collect(toSet())
+    );
+  }
+
+  private void secureCluster() {
+    expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes();
+  }
+
+  private void requestSpecifiesSecurity() {
+    expect(request.getSecurity()).andReturn(Optional.of(SecurityConfiguration.KERBEROS)).anyTimes();
+  }
+
+  private KerberosDescriptor requestHasKerberosDescriptorFor(boolean byReference, String... services) {
+    SecurityConfiguration requestSecurity = byReference
+      ? SecurityConfiguration.withReference(FAKE_DESCRIPTOR_REFERENCE)
+      : SecurityConfiguration.withDescriptor(FAKE_DESCRIPTOR);
+    expect(request.getSecurity()).andReturn(Optional.of(requestSecurity)).anyTimes();
+
+    if (byReference) {
+      expect(securityConfigurationFactory.loadSecurityConfigurationByReference(FAKE_DESCRIPTOR_REFERENCE))
+        .andReturn(SecurityConfiguration.forTest(SecurityType.KERBEROS, FAKE_DESCRIPTOR_REFERENCE, FAKE_DESCRIPTOR));
+    }
+
+    KerberosDescriptor kerberosDescriptor = kerberosDescriptorForServices(services);
+    expect(kerberosDescriptorFactory.createInstance(FAKE_DESCRIPTOR)).andReturn(kerberosDescriptor).anyTimes();
+
+    return kerberosDescriptor;
+  }
+
+  private static KerberosDescriptor kerberosDescriptorForServices(String... newServices) {
+    return new KerberosDescriptorFactory().createInstance(ImmutableMap.of(
+      KerberosDescriptor.KEY_SERVICES, Arrays.stream(newServices)
+        .map(each -> ImmutableMap.of(KerberosServiceDescriptor.KEY_NAME, each))
+        .collect(toList())
+    ));
+  }
+
   private static <T extends Throwable> T assertThrows(Class<T> expectedException, Runnable code) {
     try {
       code.run();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
index e652f78..abd570a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/StackAdvisorAdapterTest.java
@@ -252,7 +252,7 @@ public class StackAdvisorAdapterTest {
       "KAFKA",
       ImmutableMap.of("KAFKA_BROKER", emptySet()));
 
-    AddServiceInfo info = new AddServiceInfo(null, "c1", stack, org.apache.ambari.server.topology.Configuration.newEmpty(), null, newServices);
+    AddServiceInfo info = new AddServiceInfo(null, "c1", stack, org.apache.ambari.server.topology.Configuration.newEmpty(), null, null, newServices);
     AddServiceInfo infoWithRecommendations = adapter.recommendLayout(info);
 
     Map<String, Map<String, Set<String>>> expectedNewLayout = ImmutableMap.of(
diff --git a/ambari-server/src/test/resources/add_service_api/request1.json b/ambari-server/src/test/resources/add_service_api/request1.json
index dbd12b6..0b4d64b 100644
--- a/ambari-server/src/test/resources/add_service_api/request1.json
+++ b/ambari-server/src/test/resources/add_service_api/request1.json
@@ -23,6 +23,9 @@
 
   "security": {
     "type": "KERBEROS",
+    "kerberos_descriptor": {
+      "services": [ { "name": "ZOOKEEPER" } ]
+    },
     "kerberos_descriptor_reference": "ref_to_kerb_desc"
   },