You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by be...@apache.org on 2018/06/19 15:24:47 UTC

[ambari] branch branch-feature-AMBARI-14714 updated: [Ambari 23959] Save service Id with cluster configuration during blueprint cluster creation (benyoka) (#1519)

This is an automated email from the ASF dual-hosted git repository.

benyoka pushed a commit to branch branch-feature-AMBARI-14714
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-feature-AMBARI-14714 by this push:
     new 0a7a6f7  [Ambari 23959] Save service Id with cluster configuration during blueprint cluster creation (benyoka) (#1519)
0a7a6f7 is described below

commit 0a7a6f77be3c9475519ebe00d1257fa224a3aea6
Author: benyoka <be...@users.noreply.github.com>
AuthorDate: Tue Jun 19 17:24:45 2018 +0200

    [Ambari 23959] Save service Id with cluster configuration during blueprint cluster creation (benyoka) (#1519)
    
    * AMBARI-23959 service level configs with bp install, initial version (benyoka)
    
    * AMBARI-23959 fix ClusterImpl config issue (benyoka)
    
    * AMBARI-23959 fixed unit tests (benyoka)
    
    * AMBARI-23959 write new unit tests (benyoka)
    
    * AMBARI-23959 fix review comments (benyoka)
---
 .../controller/AmbariManagementControllerImpl.java | 238 +++++++------------
 .../ambari/server/controller/ServiceResponse.java  |  23 ++
 .../internal/BlueprintConfigurationProcessor.java  |   2 +-
 .../server/controller/internal/CompositeStack.java |   2 +-
 .../internal/ProvisionClusterRequest.java          |   2 +-
 .../internal/ServiceGroupResourceProvider.java     |   2 +-
 .../internal/ServiceResourceProvider.java          |   2 +-
 .../ambari/server/controller/internal/Stack.java   |   2 +-
 .../controller/internal/StackDefinition.java       |   4 +-
 .../ambari/server/state/cluster/ClusterImpl.java   |  12 +-
 .../ambari/server/topology/AmbariContext.java      |  18 +-
 .../server/topology/AsyncCallableService.java      |   2 +-
 .../BlueprintBasedClusterProvisionRequest.java     |   4 +
 .../topology/ClusterConfigurationRequest.java      | 257 +++++++--------------
 .../ambari/server/topology/ClusterTopology.java    |   8 +
 .../server/topology/ClusterTopologyImpl.java       |  22 +-
 .../topology/ConfigRecommendationStrategy.java     |  24 +-
 .../ambari/server/topology/HostGroupInfo.java      |   5 +-
 .../ambari/server/topology/TopologyManager.java    |  18 +-
 .../topology/tasks/ConfigureClusterTask.java       |  10 +-
 .../BlueprintConfigurationProcessorTest.java       |   1 +
 .../server/state/cluster/ClusterImplTest.java      |  46 ++++
 .../ambari/server/topology/AmbariContextTest.java  |   5 +-
 .../topology/ClusterConfigurationRequestTest.java  | 138 +++++++++--
 .../topology/ClusterDeployWithStartOnlyTest.java   |   4 +-
 ...terInstallWithoutStartOnComponentLevelTest.java |   4 +-
 .../topology/ClusterInstallWithoutStartTest.java   |   5 +-
 .../server/topology/ConfigureClusterTaskTest.java  |   7 +
 .../server/topology/TopologyManagerTest.java       |  18 +-
 29 files changed, 502 insertions(+), 383 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 127d8b3..aa92bac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -1842,82 +1842,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     }
 
     //check if desired configs are available in request and they were changed
-    boolean isConfigurationCreationNeeded = false;
-    if (desiredConfigs != null) {
-      for (ConfigurationRequest desiredConfig : desiredConfigs) {
-        Map<String, String> requestConfigProperties = desiredConfig.getProperties();
-        Map<String,Map<String,String>> requestConfigAttributes = desiredConfig.getPropertiesAttributes();
-
-        // processing password properties
-        if(requestConfigProperties != null && !requestConfigProperties.isEmpty()) {
-          Map<PropertyInfo.PropertyType, Set<String>> propertiesTypes = cluster.getConfigPropertiesTypes(
-              desiredConfig.getType()
-          );
-          for (Entry<String, String> property : requestConfigProperties.entrySet()) {
-            String propertyName = property.getKey();
-            String propertyValue = property.getValue();
-            if ((propertiesTypes.containsKey(PropertyType.PASSWORD) &&
-                propertiesTypes.get(PropertyType.PASSWORD).contains(propertyName)) ||
-                (requestConfigAttributes != null && requestConfigAttributes.containsKey(PASSWORD) &&
-                requestConfigAttributes.get(PASSWORD).containsKey(propertyName) &&
-                requestConfigAttributes.get(PASSWORD).get(propertyName).equals("true"))) {
-              if (SecretReference.isSecret(propertyValue)) {
-                SecretReference ref = new SecretReference(propertyValue, cluster);
-                requestConfigProperties.put(propertyName, ref.getValue());
-              }
-            }
-          }
-        }
-
-        Config clusterConfig = cluster.getDesiredConfigByType(desiredConfig.getType());
-        Map<String, String> clusterConfigProperties = null;
-        Map<String,Map<String,String>> clusterConfigAttributes = null;
-        if (clusterConfig != null) {
-          clusterConfigProperties = clusterConfig.getProperties();
-          clusterConfigAttributes = clusterConfig.getPropertiesAttributes();
-          if (!isAttributeMapsEqual(requestConfigAttributes, clusterConfigAttributes)){
-            isConfigurationCreationNeeded = true;
-            break;
-          }
-        } else {
-          isConfigurationCreationNeeded = true;
-          break;
-        }
-
-        if (requestConfigProperties == null || requestConfigProperties.isEmpty()) {
-          Config existingConfig = cluster.getConfig(desiredConfig.getType(), desiredConfig.getVersionTag());
-          if (existingConfig != null) {
-            if (!StringUtils.equals(existingConfig.getTag(), clusterConfig.getTag())) {
-              isConfigurationCreationNeeded = true;
-              break;
-            }
-          }
-        }
-        if (requestConfigProperties != null && clusterConfigProperties != null) {
-          if (requestConfigProperties.size() != clusterConfigProperties.size()) {
-            isConfigurationCreationNeeded = true;
-            break;
-          } else {
-            if ( cluster.getServiceByConfigType(clusterConfig.getType()) != null &&  clusterConfig.getServiceConfigVersions().isEmpty() ) {
-
-              //If there's no service config versions containing this config (except cluster configs), recreate it even if exactly equal
-              LOG.warn("Existing desired config doesn't belong to any service config version, " +
-                  "forcing config recreation, " +
-                  "clusterName={}, type = {}, tag={}", cluster.getClusterName(), clusterConfig.getType(),
-                  clusterConfig.getTag());
-              isConfigurationCreationNeeded = true;
-              break;
-            }
-            for (Entry<String, String> property : requestConfigProperties.entrySet()) {
-              if (!StringUtils.equals(property.getValue(), clusterConfigProperties.get(property.getKey()))) {
-                isConfigurationCreationNeeded = true;
-                break;
-              }
-            }
-          }
-        }
-      }
-    }
+    boolean isConfigurationCreationNeeded = updateClusterConfiguration(cluster, desiredConfigs);
 
     // set or create configuration mapping (and optionally create the map of properties)
     if (isConfigurationCreationNeeded) {
@@ -1953,7 +1878,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
           }
         }
         if (!configs.isEmpty()) {
-          Map<String, Config> existingConfigTypeToConfig = new HashMap();
+          Map<String, Config> existingConfigTypeToConfig = new HashMap<>();
           for (Config config : configs) {
             Config existingConfig = cluster.getDesiredConfigByType(config.getType());
             existingConfigTypeToConfig.put(config.getType(), existingConfig);
@@ -2142,6 +2067,88 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     }
   }
 
+
+  private boolean updateClusterConfiguration(Cluster cluster, List<ConfigurationRequest> desiredConfigs) throws AmbariException {
+    //check if desired configs are available in request and they were changed
+    boolean isConfigurationCreationNeeded = false;
+    if (desiredConfigs != null) {
+      for (ConfigurationRequest configurationRequest: desiredConfigs) {
+        Map<String, String> requestConfigProperties = configurationRequest.getProperties();
+        Map<String,Map<String,String>> requestConfigAttributes = configurationRequest.getPropertiesAttributes();
+
+        // processing password properties
+        if(requestConfigProperties != null && !requestConfigProperties.isEmpty()) {
+          Map<PropertyInfo.PropertyType, Set<String>> propertiesTypes = cluster.getConfigPropertiesTypes(
+            configurationRequest.getType()
+          );
+          for (Entry<String, String> property : requestConfigProperties.entrySet()) {
+            String propertyName = property.getKey();
+            String propertyValue = property.getValue();
+            if ((propertiesTypes.containsKey(PropertyType.PASSWORD) &&
+              propertiesTypes.get(PropertyType.PASSWORD).contains(propertyName)) ||
+              (requestConfigAttributes != null && requestConfigAttributes.containsKey(PASSWORD) &&
+                requestConfigAttributes.get(PASSWORD).containsKey(propertyName) &&
+                requestConfigAttributes.get(PASSWORD).get(propertyName).equals("true"))) {
+              if (SecretReference.isSecret(propertyValue)) {
+                SecretReference ref = new SecretReference(propertyValue, cluster);
+                requestConfigProperties.put(propertyName, ref.getValue());
+              }
+            }
+          }
+        }
+
+        Config clusterConfig = cluster.getDesiredConfigByType(configurationRequest.getType());
+        Map<String, String> clusterConfigProperties = null;
+        Map<String,Map<String,String>> clusterConfigAttributes = null;
+        if (clusterConfig != null) {
+          clusterConfigProperties = clusterConfig.getProperties();
+          clusterConfigAttributes = clusterConfig.getPropertiesAttributes();
+          if (!isAttributeMapsEqual(requestConfigAttributes, clusterConfigAttributes)){
+            isConfigurationCreationNeeded = true;
+            break;
+          }
+        } else {
+          isConfigurationCreationNeeded = true;
+          break;
+        }
+
+        if (requestConfigProperties == null || requestConfigProperties.isEmpty()) {
+          Config existingConfig = cluster.getConfig(configurationRequest.getType(), configurationRequest.getVersionTag());
+          if (existingConfig != null) {
+            if (!StringUtils.equals(existingConfig.getTag(), clusterConfig.getTag())) {
+              isConfigurationCreationNeeded = true;
+              break;
+            }
+          }
+        }
+        if (requestConfigProperties != null && clusterConfigProperties != null) {
+          if (requestConfigProperties.size() != clusterConfigProperties.size()) {
+            isConfigurationCreationNeeded = true;
+            break;
+          } else {
+            if ( cluster.getServiceByConfigType(clusterConfig.getType()) != null &&  clusterConfig.getServiceConfigVersions().isEmpty() ) {
+
+              //If there's no service config versions containing this config (except cluster configs), recreate it even if exactly equal
+              LOG.warn("Existing desired config doesn't belong to any service config version, " +
+                  "forcing config recreation, " +
+                  "clusterName={}, type = {}, tag={}", cluster.getClusterName(), clusterConfig.getType(),
+                clusterConfig.getTag());
+              isConfigurationCreationNeeded = true;
+              break;
+            }
+            for (Entry<String, String> property : requestConfigProperties.entrySet()) {
+              if (!StringUtils.equals(property.getValue(), clusterConfigProperties.get(property.getKey()))) {
+                isConfigurationCreationNeeded = true;
+                break;
+              }
+            }
+          }
+        }
+      }
+    }
+    return isConfigurationCreationNeeded;
+  }
+
   /**
    * Given a configuration request, compares the requested properties to the current set of desired
    * properties for the same configuration type and returns a map of property names to an array of
@@ -5448,6 +5455,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
   }
 
 
+  // TODO: synchronization?
   @Override
   public Set<ServiceConfigVersionResponse> createServiceConfigVersion(Set<ServiceConfigVersionRequest> requests) throws AmbariException, AuthorizationException {
 
@@ -5468,81 +5476,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       }
 
       //check if desired configs are available in request and they were changed
-      boolean isConfigurationCreationNeeded = false;
-      if (desiredConfigs != null) {
-        for (ConfigurationRequest configurationRequest : desiredConfigs) {
-          Map<String, String> requestConfigProperties = configurationRequest.getProperties();
-          Map<String, Map<String, String>> requestConfigAttributes = configurationRequest.getPropertiesAttributes();
-
-          // processing password properties
-          if (requestConfigProperties != null && !requestConfigProperties.isEmpty()) {
-            Map<PropertyInfo.PropertyType, Set<String>> propertiesTypes = cluster.getConfigPropertiesTypes(
-                    configurationRequest.getType()
-            );
-            for (Entry<String, String> property : requestConfigProperties.entrySet()) {
-              String propertyName = property.getKey();
-              String propertyValue = property.getValue();
-              if ((propertiesTypes.containsKey(PropertyType.PASSWORD) &&
-                      propertiesTypes.get(PropertyType.PASSWORD).contains(propertyName)) ||
-                      (requestConfigAttributes != null && requestConfigAttributes.containsKey(PASSWORD) &&
-                              requestConfigAttributes.get(PASSWORD).containsKey(propertyName) &&
-                              requestConfigAttributes.get(PASSWORD).get(propertyName).equals("true"))) {
-                if (SecretReference.isSecret(propertyValue)) {
-                  SecretReference ref = new SecretReference(propertyValue, cluster);
-                  requestConfigProperties.put(propertyName, ref.getValue());
-                }
-              }
-            }
-          }
-
-          Config clusterConfig = cluster.getDesiredConfigByType(configurationRequest.getType());
-          Map<String, String> clusterConfigProperties = null;
-          Map<String, Map<String, String>> clusterConfigAttributes = null;
-          if (clusterConfig != null) {
-            clusterConfigProperties = clusterConfig.getProperties();
-            clusterConfigAttributes = clusterConfig.getPropertiesAttributes();
-            if (!isAttributeMapsEqual(requestConfigAttributes, clusterConfigAttributes)) {
-              isConfigurationCreationNeeded = true;
-              break;
-            }
-          } else {
-            isConfigurationCreationNeeded = true;
-            break;
-          }
-
-          if (requestConfigProperties == null || requestConfigProperties.isEmpty()) {
-            Config existingConfig = cluster.getConfig(configurationRequest.getType(), configurationRequest.getVersionTag());
-            if (existingConfig != null) {
-              if (!StringUtils.equals(existingConfig.getTag(), clusterConfig.getTag())) {
-                isConfigurationCreationNeeded = true;
-                break;
-              }
-            }
-          }
-          if (requestConfigProperties != null && clusterConfigProperties != null) {
-            if (requestConfigProperties.size() != clusterConfigProperties.size()) {
-              isConfigurationCreationNeeded = true;
-              break;
-            } else {
-              if (cluster.getServiceByConfigType(clusterConfig.getType()) != null && clusterConfig.getServiceConfigVersions().isEmpty()) {
-                //If there's no service config versions containing this config (except cluster configs), recreate it even if exactly equal
-                LOG.warn("Existing desired config doesn't belong to any service config version, " +
-                                "forcing config recreation, " +
-                                "clusterName={}, type = {}, tag={}", cluster.getClusterName(), clusterConfig.getType(),
-                        clusterConfig.getTag());
-                isConfigurationCreationNeeded = true;
-                break;
-              }
-              for (Entry<String, String> property : requestConfigProperties.entrySet()) {
-                if (!StringUtils.equals(property.getValue(), clusterConfigProperties.get(property.getKey()))) {
-                  isConfigurationCreationNeeded = true;
-                  break;
-                }
-              }
-            }
-          }
-        }
-      }
+      boolean isConfigurationCreationNeeded = updateClusterConfiguration(cluster, desiredConfigs);
 
       // set or create configuration mapping (and optionally create the map of properties)
       if (isConfigurationCreationNeeded) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceResponse.java
index 7067eee..0fbbb3c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceResponse.java
@@ -20,6 +20,8 @@ package org.apache.ambari.server.controller;
 
 import org.apache.ambari.server.state.StackId;
 
+import com.google.common.base.MoreObjects;
+
 import io.swagger.annotations.ApiModelProperty;
 
 public class ServiceResponse {
@@ -309,4 +311,25 @@ public class ServiceResponse {
     @ApiModelProperty(name = "ServiceInfo")
     ServiceResponse getServiceResponse();
   }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+      .add("clusterId", clusterId)
+      .add("clusterName", clusterName)
+      .add("serviceGroupId", serviceGroupId)
+      .add("serviceGroupName", serviceGroupName)
+      .add("serviceId", serviceId)
+      .add("serviceName", serviceName)
+      .add("serviceType", serviceType)
+      .add("desiredStackId", desiredStackId)
+      .add("desiredState", desiredState)
+      .add("maintenanceState", maintenanceState)
+      .add("credentialStoreSupported", credentialStoreSupported)
+      .add("credentialStoreEnabled", credentialStoreEnabled)
+      .add("ssoIntegrationSupported", ssoIntegrationSupported)
+      .add("ssoIntegrationDesired", ssoIntegrationDesired)
+      .add("ssoIntegrationEnabled", ssoIntegrationEnabled)
+      .toString();
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 4340973..8275cfc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -356,7 +356,7 @@ public class BlueprintConfigurationProcessor {
    * @return Set of config type names that were updated by this update call
    */
   public Set<String> doUpdateForClusterCreate() throws ConfigurationTopologyException {
-      Set<String> configTypesUpdated = new HashSet<>();
+    Set<String> configTypesUpdated = new HashSet<>();
     Configuration clusterConfig = clusterTopology.getConfiguration();
     Map<String, HostGroupInfo> groupInfoMap = clusterTopology.getHostGroupInfo();
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CompositeStack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CompositeStack.java
index 8839857..07fb3d6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CompositeStack.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CompositeStack.java
@@ -119,7 +119,7 @@ public class CompositeStack implements StackDefinition {
   }
 
   @Override
-  public Collection<String> getAllConfigurationTypes(String service) {
+  public Set<String> getAllConfigurationTypes(String service) {
     return stacks.stream()
       .flatMap(m -> m.getAllConfigurationTypes(service).stream())
       .collect(toSet());
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java
index 4dd0e68..085cc97 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ProvisionClusterRequest.java
@@ -211,7 +211,7 @@ public class ProvisionClusterRequest extends BaseClusterRequest implements Provi
 
   // for tests
   public ProvisionClusterRequest(Blueprint blueprint, Configuration configuration) {
-    configRecommendationStrategy = ConfigRecommendationStrategy.NEVER_APPLY;
+    configRecommendationStrategy = ConfigRecommendationStrategy.getDefault();
     quickLinksProfileJson = null;
     mpackInstances = ImmutableList.of();
     stackIds = ImmutableSet.of();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceGroupResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceGroupResourceProvider.java
index 94def4e..496a998 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceGroupResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceGroupResourceProvider.java
@@ -322,7 +322,7 @@ public class ServiceGroupResourceProvider extends AbstractControllerResourceProv
   }
 
   // Get services from the given set of requests.
-  protected Set<ServiceGroupResponse> getServiceGroups(Set<ServiceGroupRequest> requests)
+  public Set<ServiceGroupResponse> getServiceGroups(Set<ServiceGroupRequest> requests)
     throws AmbariException {
     Set<ServiceGroupResponse> response = new HashSet<>();
     for (ServiceGroupRequest request : requests) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
index 2404ed2..ddd5d49 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
@@ -509,7 +509,7 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
   }
 
   // Get services from the given set of requests.
-  protected Set<ServiceResponse> getServices(Set<ServiceRequest> requests)
+  public Set<ServiceResponse> getServices(Set<ServiceRequest> requests)
       throws AmbariException {
     Set<ServiceResponse> response = new HashSet<>();
     for (ServiceRequest request : requests) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
index 2911cea..9489505 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
@@ -232,7 +232,7 @@ public class Stack implements StackDefinition {
   }
 
   @Override
-  public Collection<String> getAllConfigurationTypes(String service) {
+  public Set<String> getAllConfigurationTypes(String service) {
     Map<String, Map<String, ConfigProperty>> serviceConfigs = getServiceConfigurations(service);
     return serviceConfigs != null ? serviceConfigs.keySet() : ImmutableSet.of();
   }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinition.java
index ae07ad2..5ac438b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinition.java
@@ -100,9 +100,9 @@ public interface StackDefinition {
    *
    * @param service  service name
    *
-   * @return collection of all configuration types for the specified service
+   * @return set of all configuration types for the specified service
    */
-  Collection<String> getAllConfigurationTypes(String service);
+  Set<String> getAllConfigurationTypes(String service);
 
   /**
    * Get configuration types for the specified service.
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 81e65b6..6698e59 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -1610,14 +1610,10 @@ public class ClusterImpl implements Cluster {
     }
     clusterGlobalLock.writeLock().lock();
     try {
-      if (!serviceConfigs.containsKey(serviceId)) {
-        ConcurrentMap<String, ConcurrentMap<String, Config>> allServiceConfigs = new ConcurrentHashMap<>();
-        serviceConfigs.put(serviceId, allServiceConfigs);
-      }
-      ConcurrentMap<String, ConcurrentMap<String, Config>> allServiceConfigs = serviceConfigs.get(serviceId);
-      allServiceConfigs.put(config.getType(), new ConcurrentHashMap<>());
-      allServiceConfigs.get(config.getType()).put(config.getTag(), config);
-      serviceConfigs.put(serviceId,allServiceConfigs);
+      serviceConfigs
+        .computeIfAbsent(serviceId, __ -> new ConcurrentHashMap<>())
+        .computeIfAbsent(config.getType(), __ -> new ConcurrentHashMap<>())
+        .put(config.getTag(), config);
     } finally {
         clusterGlobalLock.writeLock().unlock();
     }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index 96fc3d3..9a0595a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -56,7 +56,9 @@ import org.apache.ambari.server.controller.RootComponent;
 import org.apache.ambari.server.controller.ServiceComponentHostRequest;
 import org.apache.ambari.server.controller.ServiceComponentRequest;
 import org.apache.ambari.server.controller.ServiceGroupRequest;
+import org.apache.ambari.server.controller.ServiceGroupResponse;
 import org.apache.ambari.server.controller.ServiceRequest;
+import org.apache.ambari.server.controller.ServiceResponse;
 import org.apache.ambari.server.controller.internal.AbstractResourceProvider;
 import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
 import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider;
@@ -91,6 +93,7 @@ import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Striped;
@@ -205,7 +208,6 @@ public class AmbariContext {
   }
 
   public void createAmbariResources(ClusterTopology topology, String clusterName, SecurityType securityType) {
-
     Set<StackId> stackIds = topology.getStackIds();
     createAmbariClusterResource(clusterName, stackIds, securityType);
     createAmbariServiceAndComponentResources(topology, clusterName);
@@ -289,6 +291,20 @@ public class AmbariContext {
     }
   }
 
+  public Set<ServiceResponse> getServices(String clusterName) {
+    try {
+      ServiceGroupRequest serviceGroupRequest = new ServiceGroupRequest(clusterName, null, null);
+      Set<ServiceGroupResponse> serviceGroups = getServiceGroupResourceProvider().getServiceGroups(ImmutableSet.of(serviceGroupRequest));
+      Set<ServiceRequest> serviceRequests = serviceGroups.stream().
+        map(sg -> new ServiceRequest(clusterName, sg.getServiceGroupName(), null, null, null, null)).
+        collect(toSet());
+      return getServiceResourceProvider().getServices(serviceRequests);
+    }
+    catch (AmbariException ex) {
+      throw new RuntimeException("Failed to load service groups and services", ex);
+    }
+  }
+
   public void createAmbariHostResources(long  clusterId, String hostName, Stream<ResolvedComponent> components)  {
     Host host;
     try {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index 7142e49..2b2c5e9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -97,7 +97,7 @@ public class AsyncCallableService<T> implements Callable<T> {
       } catch (ExecutionException e) {
         Throwable cause = Throwables.getRootCause(e);
         if (!(cause instanceof RetryTaskSilently)) {
-          LOG.info(String.format("Task %s exception during execution", taskName), cause);
+          LOG.error(String.format("Task %s exception during execution", taskName), cause);
         }
         lastError = cause;
         timeLeft = timeout - (System.currentTimeMillis() - startTime) - retryDelay;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintBasedClusterProvisionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintBasedClusterProvisionRequest.java
index 58ce5d7..1e6f020 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintBasedClusterProvisionRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintBasedClusterProvisionRequest.java
@@ -125,6 +125,10 @@ public class BlueprintBasedClusterProvisionRequest implements Blueprint, Provisi
     return null;
   }
 
+  public String getClusterName() {
+    return request.getClusterName();
+  }
+
   @Override
   public Type getType() {
     return Type.PROVISION;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
index ae4fe6b..49eb890 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
@@ -22,13 +22,14 @@ import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toSet;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,19 +38,23 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
 import org.apache.ambari.server.controller.ClusterRequest;
 import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.ServiceResponse;
 import org.apache.ambari.server.controller.internal.BlueprintConfigurationProcessor;
-import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
 import org.apache.ambari.server.controller.internal.ConfigurationTopologyException;
 import org.apache.ambari.server.controller.internal.StackDefinition;
 import org.apache.ambari.server.serveraction.kerberos.KerberosInvalidConfigurationException;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 /**
  * Responsible for cluster configuration.
  */
@@ -70,7 +75,8 @@ public class ClusterConfigurationRequest {
   private StackDefinition stack;
   private boolean configureSecurity = false;
 
-  public ClusterConfigurationRequest(AmbariContext ambariContext, ClusterTopology topology, boolean setInitial, StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor, boolean configureSecurity) {
+  public ClusterConfigurationRequest(AmbariContext ambariContext, ClusterTopology topology, boolean setInitial,
+                                     StackAdvisorBlueprintProcessor stackAdvisorBlueprintProcessor, boolean configureSecurity) {
     this(ambariContext, topology, setInitial, stackAdvisorBlueprintProcessor);
     this.configureSecurity = configureSecurity;
   }
@@ -145,7 +151,7 @@ public class ClusterConfigurationRequest {
       }
 
       // obtain recommended configurations before config updates
-      if (!ConfigRecommendationStrategy.NEVER_APPLY.equals(this.clusterTopology.getConfigRecommendationStrategy())) {
+      if (clusterTopology.getConfigRecommendationStrategy().shouldUseAdvisor()) {
         // get merged properties form Blueprint & cluster template (this doesn't contains stack default values)
         stackAdvisorBlueprintProcessor.adviseConfiguration(this.clusterTopology, userProvidedConfigurations);
       }
@@ -334,46 +340,87 @@ public class ClusterConfigurationRequest {
    * @param clusterTopology  cluster topology
    * @param tag              config tag
    */
-  public void setConfigurationsOnCluster(ClusterTopology clusterTopology, String tag, Set<String> updatedConfigTypes)  {
+  public void setConfigurationsOnCluster(ClusterTopology clusterTopology, String tag, Set<String> updatedConfigTypes) {
+    // TODO: This version works with Ambari 3.0 where it is assumed that any service with a configuration can be identified
+    //   by its name. Even though the cluster is multi-stack (multi-mpack), service names should not conflict across mpacks,
+    //   except client services which have no configuration. In 3.1, mpack may have conflicting service names
     //todo: also handle setting of host group scoped configuration which is updated by config processor
-    List<BlueprintServiceConfigRequest> configurationRequests = new LinkedList<>();
+    List<Pair<String, ClusterRequest>> serviceNamesAndConfigurationRequests = new ArrayList<>();
 
     Configuration clusterConfiguration = clusterTopology.getConfiguration();
 
-    for (String service : clusterTopology.getServices()) {
-      //todo: remove intermediate request type
-      // one bp config request per service
-      BlueprintServiceConfigRequest blueprintConfigRequest = new BlueprintServiceConfigRequest(service);
-
-      for (String serviceConfigType : stack.getAllConfigurationTypes(service)) {
-        Set<String> excludedConfigTypes = stack.getExcludedConfigurationTypes(service);
-        if (!excludedConfigTypes.contains(serviceConfigType)) {
-          // skip handling of cluster-env here
-          if (! serviceConfigType.equals("cluster-env")) {
-            if (clusterConfiguration.getFullProperties().containsKey(serviceConfigType)) {
-              blueprintConfigRequest.addConfigElement(serviceConfigType,
-                  clusterConfiguration.getFullProperties().get(serviceConfigType),
-                  clusterConfiguration.getFullAttributes().get(serviceConfigType));
-            }
-          }
-        }
+    final Map<String, Map<String, String>> clusterProperties = clusterConfiguration.getFullProperties();
+    final Map<String, Map<String, Map<String, String>>> clusterAttributes = clusterConfiguration.getFullAttributes();
+    final Set<String> clusterConfigTypes = clusterProperties.keySet();
+    final Set<String> globalConfigTypes = ImmutableSet.of("cluster-env");
+
+    // TODO: do we need to handle security type? In the previous version it was handled but in a broken way
+
+    for (ServiceResponse service : ambariContext.getServices(clusterTopology.getClusterName())) {
+      ClusterRequest clusterRequest =
+        new ClusterRequest(clusterTopology.getClusterId(), clusterTopology.getClusterName(), null, null, null, null);
+      clusterRequest.setDesiredConfig(new ArrayList<>());
+
+      Set<String> configTypes =
+        Sets.difference(
+          Sets.intersection(stack.getAllConfigurationTypes(service.getServiceName()), clusterConfigTypes),
+          Sets.union(stack.getExcludedConfigurationTypes(service.getServiceName()), globalConfigTypes)
+        );
+
+      for (String serviceConfigType: configTypes) {
+        Map<String, String> properties = clusterProperties.get(serviceConfigType);
+        Map<String, Map<String, String>> attributes = clusterAttributes.get(serviceConfigType);
+
+        removeNullValues(properties, attributes);
+
+        ConfigurationRequest configurationRequest = new ConfigurationRequest(clusterTopology.getClusterName(),
+          serviceConfigType,
+          tag,
+          properties,
+          attributes,
+          service.getServiceId(),
+          service.getServiceGroupId());
+        clusterRequest.getDesiredConfig().add(configurationRequest);
       }
-
-      configurationRequests.add(blueprintConfigRequest);
+      serviceNamesAndConfigurationRequests.add(Pair.of(service.getServiceName(), clusterRequest));
     }
 
     // since the stack returns "cluster-env" with each service's config ensure that only one
     // ClusterRequest occurs for the global cluster-env configuration
-    BlueprintServiceConfigRequest globalConfigRequest = new BlueprintServiceConfigRequest("GLOBAL-CONFIG");
-    Map<String, String> clusterEnvProps = clusterConfiguration.getFullProperties().get("cluster-env");
-    Map<String, Map<String, String>> clusterEnvAttributes = clusterConfiguration.getFullAttributes().get("cluster-env");
-
-    globalConfigRequest.addConfigElement("cluster-env", clusterEnvProps,clusterEnvAttributes);
-    configurationRequests.add(globalConfigRequest);
+    ClusterRequest globalConfigClusterRequest =
+      new ClusterRequest(clusterTopology.getClusterId(), clusterTopology.getClusterName(), null, null, null, null);
+
+    Map<String, String> clusterEnvProps = clusterProperties.get("cluster-env");
+    Map<String, Map<String, String>> clusterEnvAttributes = clusterAttributes.get("cluster-env");
+
+    removeNullValues(clusterEnvProps, clusterEnvAttributes);
+
+    ConfigurationRequest globalConfigurationRequest = new ConfigurationRequest(clusterTopology.getClusterName(),
+      "cluster-env",
+      tag,
+      clusterEnvProps,
+      clusterEnvAttributes,
+      null,
+      null);
+    globalConfigClusterRequest.setDesiredConfig(Lists.newArrayList(globalConfigurationRequest));
+    serviceNamesAndConfigurationRequests.add(Pair.of("GLOBAL-CONFIG", globalConfigClusterRequest));
+
+    // send configurations
+    setConfigurationsOnCluster(serviceNamesAndConfigurationRequests, tag, updatedConfigTypes);
+  }
 
-    setConfigurationsOnCluster(configurationRequests, tag, updatedConfigTypes);
+  private void removeNullValues(Map<String, String> configProperties, Map<String, Map<String, String>> configAttributes) {
+    if (null != configProperties) {
+      configProperties.values().removeIf(Objects::isNull);
+    }
+    if (null != configAttributes) {
+      configAttributes.values().removeIf(Objects::isNull);
+      configAttributes.values().forEach(map -> map.values().removeIf(Objects::isNull));
+      configAttributes.values().removeIf(v -> v.isEmpty());
+    }
   }
 
+
   /**
    * Creates a ClusterRequest for each service that
    *   includes any associated config types and configuration. The Blueprints
@@ -382,157 +429,25 @@ public class ClusterConfigurationRequest {
    *
    * This method will also send these requests to the management controller.
    *
-   * @param configurationRequests a list of requests to send to the AmbariManagementController.
+   * @param serviceNamesAndRequests a list of requests to send to the AmbariManagementController.
    */
-  private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> configurationRequests,
+  private void  setConfigurationsOnCluster(List<Pair<String, ClusterRequest>> serviceNamesAndRequests,
                                          String tag, Set<String> updatedConfigTypes)  {
-    String clusterName = null;
-    try {
-      clusterName = ambariContext.getClusterName(clusterTopology.getClusterId());
-    } catch (AmbariException e) {
-      LOG.error("Cannot get cluster name for clusterId = " + clusterTopology.getClusterId(), e);
-      throw new RuntimeException(e);
-    }
     // iterate over services to deploy
-    for (BlueprintServiceConfigRequest blueprintConfigRequest : configurationRequests) {
-      ClusterRequest clusterRequest = null;
-      // iterate over the config types associated with this service
-      List<ConfigurationRequest> requestsPerService = new LinkedList<>();
-      for (BlueprintServiceConfigElement blueprintElement : blueprintConfigRequest.getConfigElements()) {
-        Map<String, Object> clusterProperties = new HashMap<>();
-        clusterProperties.put(ClusterResourceProvider.CLUSTER_NAME_PROPERTY_ID, clusterName);
-        clusterProperties.put(ClusterResourceProvider.CLUSTER_DESIRED_CONFIGS_PROPERTY_ID + "/type", blueprintElement.getTypeName());
-        clusterProperties.put(ClusterResourceProvider.CLUSTER_DESIRED_CONFIGS_PROPERTY_ID + "/tag", tag);
-        for (Map.Entry<String, String> entry : blueprintElement.getConfiguration().entrySet()) {
-          clusterProperties.put(ClusterResourceProvider.CLUSTER_DESIRED_CONFIGS_PROPERTY_ID +
-              "/properties/" + entry.getKey(), entry.getValue());
-        }
-        if (blueprintElement.getAttributes() != null) {
-          for (Map.Entry<String, Map<String, String>> attribute : blueprintElement.getAttributes().entrySet()) {
-            String attributeName = attribute.getKey();
-            for (Map.Entry<String, String> attributeOccurrence : attribute.getValue().entrySet()) {
-              clusterProperties.put(ClusterResourceProvider.CLUSTER_DESIRED_CONFIGS_PROPERTY_ID + "/properties_attributes/"
-                  + attributeName + "/" + attributeOccurrence.getKey(), attributeOccurrence.getValue());
-            }
-          }
-        }
-
-        // only create one cluster request per service, which includes
-        // all the configuration types for that service
-        if (clusterRequest == null) {
-          SecurityType securityType;
-          String requestedSecurityType = (String) clusterProperties.get(
-              ClusterResourceProvider.CLUSTER_SECURITY_TYPE_PROPERTY_ID);
-          if(requestedSecurityType == null)
-            securityType = null;
-          else {
-            try {
-              securityType = SecurityType.valueOf(requestedSecurityType.toUpperCase());
-            } catch (IllegalArgumentException e) {
-              throw new IllegalArgumentException(String.format(
-                  "Cannot set cluster security type to invalid value: %s", requestedSecurityType));
-            }
-          }
-
-          clusterRequest = new ClusterRequest(
-              (Long) clusterProperties.get(ClusterResourceProvider.CLUSTER_ID_PROPERTY_ID),
-              (String) clusterProperties.get(ClusterResourceProvider.CLUSTER_NAME_PROPERTY_ID),
-              (String) clusterProperties.get(ClusterResourceProvider.CLUSTER_PROVISIONING_STATE_PROPERTY_ID),
-              securityType,
-              (String) clusterProperties.get(ClusterResourceProvider.CLUSTER_VERSION_PROPERTY_ID),
-              null);
-        }
-
-        List<ConfigurationRequest> listOfRequests = ambariContext.createConfigurationRequests(clusterProperties);
-        requestsPerService.addAll(listOfRequests);
-      }
-
-      // set total list of config requests, including all config types for this service
-      if (clusterRequest != null) {
-        clusterRequest.setDesiredConfig(requestsPerService);
-        LOG.info("Sending cluster config update request for service = " + blueprintConfigRequest.getServiceName());
-        ambariContext.setConfigurationOnCluster(clusterRequest);
-      } else {
-        LOG.error("ClusterRequest should not be null for service = " + blueprintConfigRequest.getServiceName());
-      }
+    for (Pair<String, ClusterRequest> serviceNameAndRequest: serviceNamesAndRequests) {
+      LOG.info("Sending cluster config update request for service = " + serviceNameAndRequest.getLeft());
+      ambariContext.setConfigurationOnCluster(serviceNameAndRequest.getRight());
     }
 
     if (tag.equals(TopologyManager.TOPOLOGY_RESOLVED_TAG)) {
       // if this is a request to resolve config, then wait until resolution is completed
       try {
         // wait until the cluster topology configuration is set/resolved
-        ambariContext.waitForConfigurationResolution(clusterName, updatedConfigTypes);
+        ambariContext.waitForConfigurationResolution(clusterTopology.getClusterName(), updatedConfigTypes);
       } catch (AmbariException e) {
         LOG.error("Error while attempting to wait for the cluster configuration to reach TOPOLOGY_RESOLVED state.", e);
       }
     }
   }
 
-  /**
-   * Internal class meant to represent the collection of configuration
-   * items and configuration attributes that are associated with a given service.
-   *
-   * This class is used to support proper configuration versioning when
-   * Ambari Blueprints is used to deploy a cluster.
-   */
-  private static class BlueprintServiceConfigRequest {
-
-    private final String serviceName;
-
-    private List<BlueprintServiceConfigElement> configElements =
-      new LinkedList<>();
-
-    BlueprintServiceConfigRequest(String serviceName) {
-      this.serviceName = serviceName;
-    }
-
-    void addConfigElement(String type, Map<String, String> props, Map<String, Map<String, String>> attributes) {
-      if (props == null) {
-        props = Collections.emptyMap();
-      }
-
-      if (attributes == null) {
-        attributes = Collections.emptyMap();
-      }
-      configElements.add(new BlueprintServiceConfigElement(type, props, attributes));
-    }
-
-    public String getServiceName() {
-      return serviceName;
-    }
-
-    List<BlueprintServiceConfigElement> getConfigElements() {
-      return configElements;
-    }
-  }
-
-  /**
-   * Internal class that represents the configuration
-   *  and attributes for a given configuration type.
-   */
-  private static class BlueprintServiceConfigElement {
-    private final String typeName;
-
-    private final Map<String, String> configuration;
-
-    private final Map<String, Map<String, String>> attributes;
-
-    BlueprintServiceConfigElement(String type, Map<String, String> props, Map<String, Map<String, String>> attributes) {
-      this.typeName = type;
-      this.configuration = props;
-      this.attributes = attributes;
-    }
-
-    public String getTypeName() {
-      return typeName;
-    }
-
-    public Map<String, String> getConfiguration() {
-      return configuration;
-    }
-
-    public Map<String, Map<String, String>> getAttributes() {
-      return attributes;
-    }
-  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java
index 5b45161..fe618a0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopology.java
@@ -44,6 +44,11 @@ public interface ClusterTopology {
   Long getClusterId();
 
   /**
+   * @return the cluster name
+   */
+  String getClusterName();
+
+  /**
    * Get the blueprint associated with the cluster.
    *
    * @return associated blueprint
@@ -201,6 +206,7 @@ public interface ClusterTopology {
    */
   RequestStatusResponse startHost(String hostName, boolean skipFailure);
 
+  @Nonnull
   ConfigRecommendationStrategy getConfigRecommendationStrategy();
 
   ProvisionAction getProvisionAction();
@@ -231,4 +237,6 @@ public interface ClusterTopology {
    * @return true if the given component belongs to a service that has serviceType=HCFS
    */
   boolean isComponentHadoopCompatible(String component);
+
+  Set<String> getHostNames();
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java
index c7297e2..e2e5ff9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterTopologyImpl.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Stream;
 
@@ -60,6 +61,7 @@ public class ClusterTopologyImpl implements ClusterTopology {
   private final Set<StackId> stackIds;
   private final StackDefinition stack;
   private Long clusterId;
+  private String clusterName;
   private final Blueprint blueprint;
   private final Configuration configuration;
   private final ConfigRecommendationStrategy configRecommendationStrategy;
@@ -78,7 +80,7 @@ public class ClusterTopologyImpl implements ClusterTopology {
     this.blueprint = topologyRequest.getBlueprint();
     this.setting = blueprint.getSetting();
     this.configuration = topologyRequest.getConfiguration();
-    configRecommendationStrategy = ConfigRecommendationStrategy.NEVER_APPLY;
+    configRecommendationStrategy = ConfigRecommendationStrategy.getDefault();
     provisionAction = topologyRequest instanceof BaseClusterRequest ? ((BaseClusterRequest) topologyRequest).getProvisionAction() : INSTALL_AND_START; // FIXME
 
     provisionRequest = null;
@@ -103,7 +105,9 @@ public class ClusterTopologyImpl implements ClusterTopology {
     this.configuration = request.getConfiguration();
     this.provisionRequest = request;
     this.resolvedComponents = resolvedComponents;
-    configRecommendationStrategy = request.getConfigRecommendationStrategy();
+    clusterName = request.getClusterName();
+    configRecommendationStrategy =
+      Optional.ofNullable(request.getConfigRecommendationStrategy()).orElse(ConfigRecommendationStrategy.getDefault());
     provisionAction = request.getProvisionAction();
 
     defaultPassword = provisionRequest.getDefaultPassword();
@@ -124,6 +128,11 @@ public class ClusterTopologyImpl implements ClusterTopology {
     return clusterId;
   }
 
+  @Override
+  public String getClusterName() {
+    return clusterName;
+  }
+
   public void setClusterId(Long clusterId) {
     this.clusterId = clusterId;
   }
@@ -376,6 +385,15 @@ public class ClusterTopologyImpl implements ClusterTopology {
       .anyMatch(stackIdService -> HADOOP_COMPATIBLE_FS.equals(stackIdService.getRight().getServiceType()));
   }
 
+  @Override
+  public Set<String> getHostNames() {
+    synchronized(hostGroupInfoMap) {
+      return hostGroupInfoMap.values().stream().flatMap(
+        hg -> hg.getHostNames().stream()
+      ).collect(toSet());
+    }
+  }
+
   private void registerHostGroupInfo(Map<String, HostGroupInfo> requestedHostGroupInfoMap) throws InvalidTopologyException {
     LOG.debug("Registering requested host group information for {} hostgroups", requestedHostGroupInfoMap.size());
     checkForDuplicateHosts(requestedHostGroupInfoMap);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ConfigRecommendationStrategy.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ConfigRecommendationStrategy.java
index bf3eacc..8d18962 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ConfigRecommendationStrategy.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ConfigRecommendationStrategy.java
@@ -19,24 +19,40 @@
 package org.apache.ambari.server.topology;
 
 public enum ConfigRecommendationStrategy {
+
   /**
    *  Configuration recommendations are always applied, overriding stack defaults and
    *  configuration defined by the user in the Blueprint and/or Cluster Creation Template.
    */
-  ALWAYS_APPLY,
+  ALWAYS_APPLY(true),
   /**
    * Configuration recommendations are ignored with this option, both for stack defaults
    * and configuration defined by the user in the Blueprint and/or Cluster Creation Template.
    */
-  NEVER_APPLY,
+  NEVER_APPLY(false),
   /**
    *  Configuration recommendations are always applied for properties listed as stack defaults,
    *  but not for configurations defined by the user in the Blueprint and/or Cluster Creation Template.
    */
-  ONLY_STACK_DEFAULTS_APPLY,
+  ONLY_STACK_DEFAULTS_APPLY(true),
   /**
    *  Configuration recommendations are always applied, overriding stack defaults but they don't
    *  override configuration defined by the user in the Blueprint and/or Cluster Creation Template.
    */
-  ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES;
+  ALWAYS_APPLY_DONT_OVERRIDE_CUSTOM_VALUES(true);
+
+  private final boolean shouldUseAdvisor;
+
+  ConfigRecommendationStrategy(boolean shouldUseAdvisor) {
+    this.shouldUseAdvisor = shouldUseAdvisor;
+  }
+
+  public boolean shouldUseAdvisor() {
+    return shouldUseAdvisor;
+  }
+
+  public static final ConfigRecommendationStrategy getDefault() {
+    return NEVER_APPLY;
+  }
+
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostGroupInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostGroupInfo.java
index b00faad..ee650e6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostGroupInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostGroupInfo.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.server.api.predicate.InvalidQueryException;
 import org.apache.ambari.server.api.predicate.PredicateCompiler;
@@ -49,7 +50,7 @@ public class HostGroupInfo {
   /**
    * hosts contained associated with the host group
    */
-  private final Collection<String> hostNames = new HashSet<>();
+  private final Set<String> hostNames = new HashSet<>();
 
   /**
    * maps host names to rack information
@@ -105,7 +106,7 @@ public class HostGroupInfo {
    *
    * @return collection of user specified host names; will never be null
    */
-  public Collection<String> getHostNames() {
+  public Set<String> getHostNames() {
     // needs to be an exclusive lock, not a read lock because collection
     // shouldn't change while copying elements into the new set instance
     synchronized (hostNames) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 634595e..c39b2ff 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -1085,13 +1085,19 @@ public class TopologyManager {
    */
   private void addClusterConfigRequest(final LogicalRequest logicalRequest, ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
     ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher);
-    executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask", throwable -> {
-      HostRoleStatus status = throwable instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED;
-      LOG.info("ConfigureClusterTask failed, marking host requests {}", status);
-      for (HostRequest hostRequest : logicalRequest.getHostRequests()) {
-        hostRequest.markHostRequestFailed(status, throwable, persistedState);
+    executor.submit(new AsyncCallableService<>(
+      task,
+      task.getTimeout(),
+      task.getRepeatDelay(),
+      "ConfigureClusterTask",
+      throwable -> {
+        HostRoleStatus status = throwable instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED;
+        LOG.info("ConfigureClusterTask failed, marking host requests {}", status);
+        for (HostRequest hostRequest : logicalRequest.getHostRequests()) {
+          hostRequest.markHostRequestFailed(status, throwable, persistedState);
+        }
       }
-    }));
+    ));
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index ed1c451..0f92b3b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -70,12 +70,19 @@ public class ConfigureClusterTask implements Callable<Boolean> {
 
     Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
 
+    String msg = null;
     if (!areHostGroupsResolved(requiredHostGroups)) {
-      String msg = "Some host groups require more hosts, cluster configuration cannot begin";
+      msg = "Some host groups require more hosts, cluster configuration cannot begin";
+    }
+      else if (topology.getConfigRecommendationStrategy().shouldUseAdvisor() && topology.getHostNames().isEmpty()) {
+      msg = "Getting config recommendations requires at least one host, cluster configuration cannot begin";
+    }
+    if (null != msg) {
       LOG.info(msg);
       throw new AsyncCallableService.RetryTaskSilently(msg);
     }
 
+
     LOG.info("All required host groups are complete, cluster configuration can now begin");
     configRequest.process();
     LOG.info("Cluster configuration finished successfully");
@@ -155,7 +162,6 @@ public class ConfigureClusterTask implements Callable<Boolean> {
         }
       }
     }
-
     return allHostGroupsResolved;
   }
 
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 7b42468..4bd1455 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -8255,6 +8255,7 @@ public class BlueprintConfigurationProcessorTest extends EasyMockSupport {
 
     expect(topologyRequestMock.getBlueprint()).andReturn(blueprint).anyTimes();
     expect(topologyRequestMock.getClusterId()).andReturn(1L).anyTimes();
+    expect(topologyRequestMock.getClusterName()).andReturn("testCluster").anyTimes();
     expect(topologyRequestMock.getConfigRecommendationStrategy()).andReturn(recommendation).anyTimes();
     expect(topologyRequestMock.getConfiguration()).andReturn(configuration).anyTimes();
     expect(topologyRequestMock.getDefaultPassword()).andReturn("secret").anyTimes();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
index 7092526..daafcbd 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterImplTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.ambari.server.state.cluster;
 
+import static java.util.stream.Collectors.toSet;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
@@ -34,6 +36,7 @@ import static org.junit.Assert.fail;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.ambari.server.AmbariException;
@@ -46,6 +49,7 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
@@ -56,7 +60,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -341,6 +347,46 @@ public class ClusterImplTest {
 
     // Then
     assertEquals(2, clusterSize);
+  }
+
+  @Test
+  public void testAddConfigWithServiceId() throws Exception {
+    // Given
+    String clusterName = "TEST_ADD_CONFIG_WITH_SERVICE_ID";
+    StackId stackId = new StackId("HDPCORE-1.0.0.0");
 
+    ormTestHelper.createMpack(stackId);
+    clusters.addCluster(clusterName, stackId);
+
+    Cluster cluster = clusters.getCluster(clusterName);
+    final Long serviceId = 1L;
+    final String configType = "zoo.cfg";
+
+    // When
+    List<Object> initialVersion = ImmutableList.of(configType, "INITIAL", ImmutableMap.of("prop", "value1"));
+    List<Object> topologyResolved = ImmutableList.of(configType, "TOPOLOGY_RESOLVED", ImmutableMap.of("prop", "value2"));
+      cluster.addConfig(createConfig(initialVersion), serviceId);
+    cluster.addConfig(createConfig(topologyResolved), serviceId);
+
+    // Then (test that configs are properly saved, one doesn't override the other)
+    List<Config> zooCfgVersions = cluster.getConfigsByServiceId(serviceId);
+    assertEquals(2, zooCfgVersions.size());
+    assertEquals(
+      ImmutableSet.of(initialVersion, topologyResolved),
+      zooCfgVersions.stream().map(this::configToList).collect(toSet()));
+  }
+
+  private Config createConfig(List<Object> data) {
+    Config config = mock(Config.class);
+    expect(config.getType()).andReturn((String)data.get(0)).anyTimes();
+    expect(config.getTag()).andReturn((String)data.get(1)).anyTimes();
+    expect(config.getProperties()).andReturn((Map)data.get(2)).anyTimes();
+    replay(config);
+    return config;
   }
+
+  private List<Object> configToList(Config config) {
+    return ImmutableList.of(config.getType(), config.getTag(), config.getProperties());
+  }
+
 }
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java
index 5fd7d8a..ffac4f0 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java
@@ -82,7 +82,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
@@ -134,7 +133,7 @@ public class AmbariContextTest {
   private static final Map<Long, ConfigGroup> configGroups = new HashMap<>();
   private Configuration bpConfiguration = null;
   private Configuration group1Configuration = null;
-  private static final Collection<String> group1Hosts = Arrays.asList(HOST1, HOST2);
+  private static final Set<String> group1Hosts = ImmutableSet.of(HOST1, HOST2);
 
   private Capture<Set<ConfigGroupRequest>> configGroupRequestCapture = EasyMock.newCapture();
   private Setting setting = createNiceMock(Setting.class);
@@ -465,7 +464,7 @@ public class AmbariContextTest {
 
     reset(group1Info);
     expect(group1Info.getConfiguration()).andReturn(group1Configuration).anyTimes();
-    Collection<String> groupHosts = ImmutableList.of(HOST1, HOST2, "pending_host"); // pending_host is not registered with the cluster
+    Set<String> groupHosts = ImmutableSet.of(HOST1, HOST2, "pending_host"); // pending_host is not registered with the cluster
     expect(group1Info.getHostNames()).andReturn(groupHosts).anyTimes(); // there are 3 hosts for the host group
     // replay all mocks
     replayAll();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterConfigurationRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterConfigurationRequestTest.java
index 7d51906..b2786cf 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterConfigurationRequestTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterConfigurationRequestTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.topology;
 
+import static java.util.stream.Collectors.toSet;
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -34,14 +35,20 @@ import static org.junit.Assert.assertTrue;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.ClusterRequest;
 import org.apache.ambari.server.controller.KerberosHelper;
+import org.apache.ambari.server.controller.ServiceResponse;
 import org.apache.ambari.server.controller.internal.ConfigurationTopologyException;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.serveraction.kerberos.KerberosInvalidConfigurationException;
@@ -63,6 +70,8 @@ import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 
@@ -74,6 +83,11 @@ import com.google.common.collect.Maps;
 @PrepareForTest({AmbariContext.class})
 public class ClusterConfigurationRequestTest {
 
+  public static final Long CLUSTER_ID = Long.valueOf(1);
+  public static final String CLUSTER_NAME = "testCluster";
+  public static final List<String> SERVICE_NAMES = ImmutableList.of("HDFS", "KERBEROS", "ZOOKEEPER");
+
+
   @Rule
   public EasyMockRule mocks = new EasyMockRule(this);
 
@@ -233,21 +247,20 @@ public class ClusterConfigurationRequestTest {
     expect(controller.getClusters()).andReturn(clusters).anyTimes();
     expect(controller.getKerberosHelper()).andReturn(kerberosHelper).times(2);
 
-    expect(clusters.getCluster("testCluster")).andReturn(cluster).anyTimes();
+    expect(clusters.getCluster(CLUSTER_NAME)).andReturn(cluster).anyTimes();
 
     expect(topology.getStack()).andReturn(stack).anyTimes();
     expect(topology.getStackIds()).andReturn(ImmutableSet.of(STACK_ID)).anyTimes();
     expect(stack.getName()).andReturn(STACK_NAME).anyTimes();
     expect(stack.getVersion()).andReturn(STACK_VERSION).anyTimes();
     expect(stack.getServiceForConfigType("testConfigType")).andReturn("KERBEROS").anyTimes();
-    expect(stack.getAllConfigurationTypes(anyString())).andReturn(Collections.singletonList("testConfigType")
+    expect(stack.getAllConfigurationTypes(anyString())).andReturn(ImmutableSet.of("testConfigType")
     ).anyTimes();
     expect(stack.getExcludedConfigurationTypes(anyString())).andReturn(Collections.emptySet()).anyTimes();
     expect(stack.getConfigurationPropertiesWithMetadata(anyString(), anyString())).andReturn(Collections.emptyMap()).anyTimes();
 
-    Set<String> services = ImmutableSet.of("HDFS", "KERBEROS", "ZOOKEEPER");
-    expect(topology.getServices()).andReturn(services).anyTimes();
-    expect(stack.getConfiguration(services)).andReturn(stackDefaultConfig).once();
+    expect(topology.getServices()).andReturn(SERVICE_NAMES).anyTimes();
+    expect(stack.getConfiguration(SERVICE_NAMES)).andReturn(stackDefaultConfig).once();
 
     expect(topology.getComponents()).andAnswer(() -> Stream.of(
       ResolvedComponent.builder(new Component("NAMENODE")).serviceType("HDFS").buildPartial(),
@@ -262,14 +275,20 @@ public class ClusterConfigurationRequestTest {
 
     expect(topology.getConfiguration()).andReturn(blueprintConfig).anyTimes();
     expect(topology.getHostGroupInfo()).andReturn(Collections.emptyMap()).anyTimes();
-    expect(topology.getClusterId()).andReturn(Long.valueOf(1)).anyTimes();
+    expect(topology.getClusterId()).andReturn(CLUSTER_ID).anyTimes();
+    expect(topology.getClusterName()).andReturn(CLUSTER_NAME).anyTimes();
     expect(topology.getHostGroupsForComponent(anyString())).andReturn(Collections.emptyList())
       .anyTimes();
 
     expect(ambariContext.getConfigHelper()).andReturn(configHelper).anyTimes();
-    expect(ambariContext.getClusterName(Long.valueOf(1))).andReturn("testCluster").anyTimes();
+    expect(ambariContext.getClusterName(Long.valueOf(1))).andReturn(CLUSTER_NAME).anyTimes();
     expect(ambariContext.createConfigurationRequests(EasyMock.anyObject())).andReturn(Collections
       .emptyList()).anyTimes();
+   Set<ServiceResponse> services = IntStream.range(0, SERVICE_NAMES.size()).boxed().map(
+      serviceId -> new ServiceResponse(CLUSTER_ID, CLUSTER_NAME, 1L, "CORE", (long)serviceId, SERVICE_NAMES.get(serviceId),
+          null, null, null, null, false, false, false, false, false)
+    ).collect(toSet());
+    expect(ambariContext.getServices(anyString())).andReturn(services).anyTimes();
 
     expect(configHelper.getDefaultStackProperties(
         EasyMock.eq(STACK_ID))).andReturn(stackProperties).anyTimes();
@@ -302,7 +321,7 @@ public class ClusterConfigurationRequestTest {
 
 
     String clusterName = captureClusterName.getValue();
-    assertEquals("testCluster", clusterName);
+    assertEquals(CLUSTER_NAME, clusterName);
     return captureUpdatedConfigTypes;
   }
 
@@ -318,29 +337,38 @@ public class ClusterConfigurationRequestTest {
     expectLastCall().andReturn(controller).anyTimes();
 
     expect(controller.getClusters()).andReturn(clusters).anyTimes();
-    expect(clusters.getCluster("testCluster")).andReturn(cluster).anyTimes();
+    expect(clusters.getCluster(CLUSTER_NAME)).andReturn(cluster).anyTimes();
 
     expect(topology.getStack()).andReturn(stack).anyTimes();
     expect(topology.getStackIds()).andReturn(ImmutableSet.of(STACK_ID)).anyTimes();
     expect(stack.getName()).andReturn(STACK_NAME).anyTimes();
     expect(stack.getVersion()).andReturn(STACK_VERSION).anyTimes();
-    expect(stack.getAllConfigurationTypes(anyString())).andReturn(Collections.singletonList("testConfigType")).anyTimes();
+    expect(stack.getAllConfigurationTypes(anyString())).andReturn(ImmutableSet.of("testConfigType")).anyTimes();
     expect(stack.getExcludedConfigurationTypes(anyString())).andReturn(Collections.emptySet()).anyTimes();
     expect(stack.getConfigurationPropertiesWithMetadata(anyString(), anyString())).andReturn(Collections.emptyMap()).anyTimes();
-
-    expect(topology.getServices()).andReturn(ImmutableSet.of("HDFS", "KERBEROS", "ZOOKEEPER")).anyTimes();
+    Set<String> serviceNames = ImmutableSet.of("HDFS", "KERBEROS", "ZOOKEEPER");
+    expect(topology.getServices()).andReturn(serviceNames).anyTimes();
     expect(topology.getAmbariContext()).andReturn(ambariContext).anyTimes();
     expect(topology.getComponents()).andAnswer(Stream::empty).anyTimes();
     expect(topology.getConfigRecommendationStrategy()).andReturn(ConfigRecommendationStrategy.NEVER_APPLY).anyTimes();
     expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
     expect(topology.getConfiguration()).andReturn(stackConfig).anyTimes();
     expect(topology.getHostGroupInfo()).andReturn(Collections.emptyMap()).anyTimes();
-    expect(topology.getClusterId()).andReturn(1L).anyTimes();
+    expect(topology.getClusterId()).andReturn(CLUSTER_ID).anyTimes();
 
     expect(ambariContext.getConfigHelper()).andReturn(configHelper).anyTimes();
-    expect(ambariContext.getClusterName(1L)).andReturn("testCluster").anyTimes();
+    expect(ambariContext.getClusterName(1L)).andReturn(CLUSTER_NAME).anyTimes();
     expect(ambariContext.createConfigurationRequests(EasyMock.anyObject())).andReturn(Collections
       .emptyList()).anyTimes();
+    Set<ServiceResponse> services = IntStream.range(0, serviceNames.size()).boxed().
+      map(
+        serviceId -> new ServiceResponse(CLUSTER_ID, CLUSTER_NAME, 1L, "CORE", (long)serviceId, SERVICE_NAMES.get(serviceId),
+            null, null, null, null, false, false, false, false, false)
+      ).
+      collect(toSet());
+  expect(ambariContext.getServices(anyString())).andReturn(services).anyTimes();
+
+
 
     expect(configHelper.getDefaultStackProperties(
         EasyMock.eq(STACK_ID))).andReturn(stackProperties).anyTimes();
@@ -490,5 +518,87 @@ public class ClusterConfigurationRequestTest {
     return new Configuration(firstLevelConfig, firstLevelAttributes, secondLevelConf);
   }
 
+  @Test
+  public void testProcessClusterConfigRequest_SaveServiceIdAndServiceGroupId() throws Exception {
+    // GIVEN
+    Configuration configuration = new Configuration(
+      Maps.newHashMap(ImmutableMap.of(
+        "zoo.cfg", ImmutableMap.of("testKey", "testValue"),
+        "zookeeper-env", ImmutableMap.of("testKey", "testValue"),
+        "hdfs-site", ImmutableMap.of("testKey", "testValue"),
+        "hadoop-env", ImmutableMap.of("testKey", "testValue"))),
+      Maps.newHashMap()
+    );
+    List<String> serviceNames = ImmutableList.of("ZOOKEEPER", "HDFS");
+    StackId stackId = new StackId("HDPCORE", "1.0.0");
+    AtomicLong serviceIds = new AtomicLong(0);
+    Set<ServiceResponse> serviceResponses = serviceNames.stream().
+      map(
+        sName -> new ServiceResponse(CLUSTER_ID, CLUSTER_NAME, 0L, "HDPCORE", serviceIds.getAndIncrement(),
+          sName, sName, stackId, "1.0.0.0-b292", "INSTALLED", true, false, true, false, false)).
+      collect(toSet());
+
+    Map<String, HostGroupInfo> hostGroupInfoMap = Maps.newHashMap();
+    HostGroupInfo hg1 = new HostGroupInfo("hg1");
+    hg1.setConfiguration(new Configuration());
+    hostGroupInfoMap.put("hg1", hg1);
+
+    expect(topology.getAmbariContext()).andReturn(ambariContext).anyTimes();
+    expect(topology.getConfiguration()).andReturn(configuration).anyTimes();
+    expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
+    expect(topology.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes();
+    expect(topology.getStack()).andReturn(stack).anyTimes();
+    expect(topology.getServices()).andReturn(serviceNames).anyTimes();
+    expect(topology.getConfigRecommendationStrategy()).andReturn(ConfigRecommendationStrategy.NEVER_APPLY).anyTimes();
+
+    expect(topology.isValidConfigType("zoo.cfg")).andReturn(true).anyTimes();
+    expect(topology.isValidConfigType("zookeeper-env")).andReturn(true).anyTimes();
+    expect(topology.isValidConfigType("hdfs-site")).andReturn(true).anyTimes();
+    expect(topology.isValidConfigType("hadoop-env")).andReturn(true).anyTimes();
+    expect(topology.isValidConfigType("cluster-env")).andReturn(true).anyTimes();
+    expect(topology.getStackIds()).andReturn(ImmutableSet.of(stackId));
+
+    expect(stack.getExcludedConfigurationTypes(anyString())).andReturn(ImmutableSet.of()).anyTimes();
+    expect(stack.getAllConfigurationTypes("ZOOKEEPER")).andReturn(ImmutableSet.of("zoo.cfg", "zookeeper-env"));
+    expect(stack.getAllConfigurationTypes("HDFS")).andReturn(ImmutableSet.of("hdfs-site", "hadoop-env"));
+
+    expect(ambariContext.getConfigHelper()).andReturn(configHelper).anyTimes();
+    expect(ambariContext.getServices(anyString())).andReturn(serviceResponses).anyTimes();
+    Capture<ClusterRequest> clusterRequestCapture = Capture.newInstance(CaptureType.ALL);
+    ambariContext.setConfigurationOnCluster(capture(clusterRequestCapture));
+    expectLastCall().anyTimes();
+
+    expect(configHelper.getDefaultStackProperties(anyObject())).andReturn(stackProperties).anyTimes();
+
+    EasyMock.replay(stack, blueprint, topology, ambariContext, configHelper);
+
+    // WHEN
+    ClusterConfigurationRequest request =
+      new ClusterConfigurationRequest(ambariContext, topology, false, stackAdvisorBlueprintProcessor);
+    request.process();
+
+    // THEN
+    Map<String, Optional<Long>> expectedServiceIds = ImmutableMap.of(
+      "zoo.cfg", Optional.of(0L),
+      "zookeeper-env", Optional.of(0L),
+      "hadoop-env", Optional.of(1L),
+      "hdfs-site", Optional.of(1L),
+      "cluster-env", Optional.<Long>empty()
+    );
+    Set<String> clusterConfigTypes = new HashSet<>();
+    clusterRequestCapture.getValues().forEach(
+      clusterRequest -> clusterRequest.getDesiredConfig().forEach(
+        configRequest -> {
+          String configType = configRequest.getType();
+          clusterConfigTypes.add(configType);
+          assertTrue("Service ID should have been set except for global configs.",
+            configType.equals("cluster-env") || configRequest.getServiceGroupId().equals(0L));
+          assertEquals("Unexpected service id.",
+            expectedServiceIds.get(configType).orElse(null), configRequest.getServiceId());
+        }
+      )
+    );
+    assertEquals("Expected and actual cluster config types differ.", expectedServiceIds.keySet(), clusterConfigTypes);
+  }
 
 }
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
index ab7e9e5..110ca91 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
@@ -279,8 +279,8 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
     expect(blueprint.getMpacks()).andReturn(ImmutableSet.of()).anyTimes();
     // don't expect toEntity()
 
-    expect(stack.getAllConfigurationTypes("service1")).andReturn(Arrays.asList("service1-site", "service1-env")).anyTimes();
-    expect(stack.getAllConfigurationTypes("service2")).andReturn(Arrays.asList("service2-site", "service2-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service1")).andReturn(ImmutableSet.of("service1-site", "service1-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service2")).andReturn(ImmutableSet.of("service2-site", "service2-env")).anyTimes();
     expect(stack.getAutoDeployInfo("component1")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component2")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component3")).andReturn(null).anyTimes();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
index 7d1ef83..c752fb4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
@@ -277,8 +277,8 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
     expect(blueprint.getMpacks()).andReturn(ImmutableSet.of()).anyTimes();
     // don't expect toEntity()
 
-    expect(stack.getAllConfigurationTypes("service1")).andReturn(Arrays.asList("service1-site", "service1-env")).anyTimes();
-    expect(stack.getAllConfigurationTypes("service2")).andReturn(Arrays.asList("service2-site", "service2-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service1")).andReturn(ImmutableSet.of("service1-site", "service1-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service2")).andReturn(ImmutableSet.of("service2-site", "service2-env")).anyTimes();
     expect(stack.getAutoDeployInfo("component1")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component2")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component3")).andReturn(null).anyTimes();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
index cbdd860..759210e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
@@ -39,6 +39,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -280,8 +281,8 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
     expect(blueprint.getMpacks()).andReturn(ImmutableSet.of()).anyTimes();
     // don't expect toEntity()
 
-    List<String> configTypes1 = Arrays.asList("service1-site", "service1-env");
-    List<String> configTypes2 = Arrays.asList("service2-site", "service2-env");
+    Set<String> configTypes1 = ImmutableSet.of("service1-site", "service1-env");
+    Set<String> configTypes2 = ImmutableSet.of("service2-site", "service2-env");
     expect(stack.getConfigurationTypes("service1")).andReturn(configTypes1).anyTimes();
     expect(stack.getConfigurationTypes("service2")).andReturn(configTypes2).anyTimes();
     expect(stack.getAllConfigurationTypes("service1")).andReturn(configTypes1).anyTimes();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index 925e6cb..9690022 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.topology;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.Collections;
 
@@ -70,11 +71,15 @@ public class ConfigureClusterTaskTest extends EasyMockSupport {
     // GIVEN
     expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.emptyList());
     expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.emptyMap());
+    expect(clusterTopology.getConfigRecommendationStrategy()).andReturn(ConfigRecommendationStrategy.NEVER_APPLY).anyTimes();
     expect(clusterTopology.getClusterId()).andReturn(1L).anyTimes();
     expect(clusterTopology.getAmbariContext()).andReturn(ambariContext);
     expect(ambariContext.getClusterName(1L)).andReturn("testCluster");
     clusterConfigurationRequest.process();
+    expectLastCall();
     ambariEventPublisher.publish(anyObject(AmbariEvent.class));
+    expectLastCall();
+
     replayAll();
 
     // WHEN
@@ -90,7 +95,9 @@ public class ConfigureClusterTaskTest extends EasyMockSupport {
     // GIVEN
     expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.emptyList());
     expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.emptyMap());
+    expect(clusterTopology.getConfigRecommendationStrategy()).andReturn(ConfigRecommendationStrategy.NEVER_APPLY).anyTimes();
     clusterConfigurationRequest.process();
+    expectLastCall();
     replayAll();
 
     AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test", t -> {});
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 5f7c854..c6507ff 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -47,6 +47,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.ambari.server.AmbariException;
@@ -58,6 +59,7 @@ import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.ClusterRequest;
 import org.apache.ambari.server.controller.ConfigurationRequest;
 import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ServiceResponse;
 import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.controller.internal.HostResourceProvider;
 import org.apache.ambari.server.controller.internal.MpackResourceProvider;
@@ -99,6 +101,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
@@ -118,6 +121,7 @@ public class TopologyManagerTest {
   private static final String SAMPLE_QUICKLINKS_PROFILE_1 = "{\"filters\":[{\"visible\":true}],\"services\":[]}";
   private static final String SAMPLE_QUICKLINKS_PROFILE_2 =
       "{\"filters\":[],\"services\":[{\"name\":\"HDFS\",\"components\":[],\"filters\":[{\"visible\":true}]}]}";
+  private final List<String> SERVICE_NAMES = ImmutableList.of("service1", "service2");
 
   @Rule
   public EasyMockRule mocks = new EasyMockRule(this);
@@ -211,6 +215,13 @@ public class TopologyManagerTest {
   private final Configuration topoGroup2Config = new Configuration(new HashMap<>(),
     new HashMap<>(), bpGroup2Config);
 
+  private final Set<ServiceResponse> services = IntStream.range(0, SERVICE_NAMES.size()).boxed().
+    map(
+      serviceId -> new ServiceResponse(CLUSTER_ID, CLUSTER_NAME, 1L, "CORE", (long)serviceId, SERVICE_NAMES.get(serviceId),
+          null, null, null, null, false, false, false, false, false)
+    ).
+    collect(toSet());
+
   private HostGroupInfo group1Info = new HostGroupInfo("group1");
   private HostGroupInfo group2Info = new HostGroupInfo("group2");
   private Map<String, HostGroupInfo> groupInfoMap = new HashMap<>();
@@ -295,15 +306,15 @@ public class TopologyManagerTest {
     expect(blueprint.getHostGroupsForComponent("component3")).andReturn(Arrays.asList(group1, group2)).anyTimes();
     expect(blueprint.getHostGroupsForComponent("component4")).andReturn(Collections.singleton(group2)).anyTimes();
     expect(blueprint.getName()).andReturn(BLUEPRINT_NAME).anyTimes();
-    expect(clusterTopologyMock.getServices()).andReturn(Arrays.asList("service1", "service2")).anyTimes();
+    expect(clusterTopologyMock.getServices()).andReturn(SERVICE_NAMES).anyTimes();
     expect(clusterTopologyMock.getStack()).andReturn(stack).anyTimes();
     expect(blueprint.getStackIds()).andReturn(ImmutableSet.of(STACK_ID)).anyTimes();
     expect(blueprint.getSecurity()).andReturn(SecurityConfiguration.NONE).anyTimes();
     expect(blueprint.getMpacks()).andReturn(ImmutableSet.of()).anyTimes();
     // don't expect toEntity()
 
-    expect(stack.getAllConfigurationTypes("service1")).andReturn(Arrays.asList("service1-site", "service1-env")).anyTimes();
-    expect(stack.getAllConfigurationTypes("service2")).andReturn(Arrays.asList("service2-site", "service2-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service1")).andReturn(ImmutableSet.of("service1-site", "service1-env")).anyTimes();
+    expect(stack.getAllConfigurationTypes("service2")).andReturn(ImmutableSet.of("service2-site", "service2-env")).anyTimes();
     expect(stack.getAutoDeployInfo("component1")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component2")).andReturn(null).anyTimes();
     expect(stack.getAutoDeployInfo("component3")).andReturn(null).anyTimes();
@@ -386,6 +397,7 @@ public class TopologyManagerTest {
     expectLastCall().anyTimes();
     ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_ID);
     expectLastCall().anyTimes();
+    expect(ambariContext.getServices(anyString())).andReturn(services).anyTimes();
 
     expect(clusterController.ensureResourceProvider(Resource.Type.Mpack)).andReturn(mpackResourceProvider).anyTimes();
     expect(resourceProvider.createResources((anyObject()))).andReturn(new RequestStatusImpl(null, null, null)).anyTimes(); // persist raw request