You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2018/11/21 16:34:33 UTC

[ambari] branch trunk updated: AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe80b39  AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639)
fe80b39 is described below

commit fe80b390b03a47bda87dde95bc9feb11a49818a0
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Nov 21 17:34:28 2018 +0100

    AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639)
---
 .../addservice/AddServiceOrchestrator.java         |   5 +-
 .../addservice/ResourceProviderAdapter.java        | 182 +++++++++++++++++----
 2 files changed, 149 insertions(+), 38 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
index e137d4a..d30ab09 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
@@ -113,8 +113,8 @@ public class AddServiceOrchestrator {
       throw new IllegalArgumentException("No new services to be added");
     }
 
-    Configuration config = stack.getValidDefaultConfig();
-    // TODO add user-defined config
+    Configuration config = request.getConfiguration();
+    config.setParentConfiguration(stack.getValidDefaultConfig());
 
     RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager);
     AddServiceInfo validatedRequest = new AddServiceInfo(cluster.getClusterName(), stack, config, stages, newServices);
@@ -149,6 +149,7 @@ public class AddServiceOrchestrator {
    */
   private void createResources(AddServiceInfo request) {
     LOG.info("Creating resources for {}", request);
+    resourceProviders.updateExistingConfigs(request);
     resourceProviders.createServices(request);
     resourceProviders.createComponents(request);
     resourceProviders.createConfigs(request);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
index 722d5f6..ee52c7f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
@@ -20,12 +20,12 @@ package org.apache.ambari.server.topology.addservice;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -56,13 +56,16 @@ import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
 import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.security.authorization.AuthorizationException;
+import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.topology.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 /**
  * Creates resources using the resource providers.
@@ -76,6 +79,9 @@ public class ResourceProviderAdapter {
   @Inject
   private AmbariManagementController controller;
 
+  @Inject
+  private ConfigHelper configHelper;
+
   public void createServices(AddServiceInfo request) {
     LOG.info("Creating service resources for {}", request);
 
@@ -83,7 +89,7 @@ public class ResourceProviderAdapter {
       .map(service -> createServiceRequestProperties(request, service))
       .collect(toSet());
 
-    createResources(properties, Resource.Type.Service);
+    createResources(request, properties, Resource.Type.Service);
   }
 
   public void createComponents(AddServiceInfo request) {
@@ -94,7 +100,7 @@ public class ResourceProviderAdapter {
         .map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component)))
       .collect(toSet());
 
-    createResources(properties, Resource.Type.Component);
+    createResources(request, properties, Resource.Type.Component);
   }
 
   public void createHostComponents(AddServiceInfo request) {
@@ -106,30 +112,19 @@ public class ResourceProviderAdapter {
           .map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host))))
       .collect(toSet());
 
-    createResources(properties, Resource.Type.HostComponent);
+    createResources(request, properties, Resource.Type.HostComponent);
   }
 
   public void createConfigs(AddServiceInfo request) {
     LOG.info("Creating configurations for {}", request);
+    Set<ClusterRequest> requests = createConfigRequestsForNewServices(request);
+    updateCluster(request, requests, "Error creating configurations for %s");
+  }
 
-    Set<ClusterRequest> requests = new HashSet<>();
-    for (String service : request.newServices().keySet()) {
-      List<ConfigurationRequest> configRequests = request.getStack().getConfigurationTypes(service).stream()
-        .filter(configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV))
-        .map(configType -> createClusterConfigRequestProperties(request, service, configType))
-        .collect(toList());
-      ClusterRequest internalRequest = new ClusterRequest(null, request.clusterName(), null, null);
-      internalRequest.setDesiredConfig(configRequests);
-      requests.add(internalRequest);
-    }
-
-    try {
-      controller.updateClusters(requests, null);
-    } catch (AmbariException | AuthorizationException e) {
-      String msg = String.format("Error creating configurations for %s", request);
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
-    }
+  public void updateExistingConfigs(AddServiceInfo request) {
+    LOG.info("Updating existing configurations for {}", request);
+    Set<ClusterRequest> requests = createConfigRequestsForExistingServices(request);
+    updateCluster(request, requests, "Error updating configurations for %s");
   }
 
   public void updateServiceDesiredState(AddServiceInfo request, State desiredState) {
@@ -149,7 +144,7 @@ public class ResourceProviderAdapter {
       "context", String.format("Put new components to %s state", desiredState)
     ));
     HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
-    Request internalRequest = createRequest(request, properties, Resource.Type.HostComponent);
+    Request internalRequest = createRequest(request.clusterName(), properties, Resource.Type.HostComponent);
     try {
       rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false);
     } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
@@ -159,35 +154,44 @@ public class ResourceProviderAdapter {
     }
   }
 
-  private static void createResources(Set<Map<String, Object>> properties, Resource.Type resourceType) {
+  private static void createResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) {
     Request internalRequest = new RequestImpl(null, properties, null, null);
     ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
     try {
       rp.createResources(internalRequest);
     } catch (UnsupportedPropertyException | SystemException | ResourceAlreadyExistsException | NoSuchParentResourceException e) {
-      String msg = String.format("Error creating resources: %s", resourceType);
+      String msg = String.format("Error creating resources %s for %s", resourceType, request);
       LOG.error(msg, e);
       throw new RuntimeException(msg, e);
     }
   }
 
   private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) {
-    Request internalRequest = createRequest(request, properties, resourceType);
+    Request internalRequest = createRequest(request.clusterName(), properties, resourceType);
     ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
     try {
       rp.updateResources(internalRequest, predicate);
     } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
-      String msg = String.format("Error updating resources: %s", resourceType);
+      String msg = String.format("Error updating resources %s for %s", resourceType, request);
       LOG.error(msg, e);
       throw new RuntimeException(msg, e);
     }
   }
 
-  private static Request createRequest(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) {
-    Map<String, String> requestInfoProperties = ImmutableMap.of(
+  private void updateCluster(AddServiceInfo addServiceRequest, Set<ClusterRequest> requests, String errorMessageFormat) {
+    try {
+      controller.updateClusters(requests, null);
+    } catch (AmbariException | AuthorizationException e) {
+      String msg = String.format(errorMessageFormat, addServiceRequest);
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+    }
+  }
 
+  private static Request createRequest(String clusterName, Set<Map<String, Object>> properties, Resource.Type resourceType) {
+    Map<String, String> requestInfoProperties = ImmutableMap.of(
       RequestOperationLevel.OPERATION_LEVEL_ID, RequestOperationLevel.getExternalLevelName(resourceType.name()),
-      RequestOperationLevel.OPERATION_CLUSTER_ID, request.clusterName()
+      RequestOperationLevel.OPERATION_CLUSTER_ID, clusterName
     );
     return new RequestImpl(null, properties, requestInfoProperties, null);
   }
@@ -225,12 +229,118 @@ public class ResourceProviderAdapter {
     return properties.build();
   }
 
-  private static ConfigurationRequest createClusterConfigRequestProperties(AddServiceInfo request, String service, String configType) {
-    LOG.debug("Creating config type {} for service {}", configType, service);
+  private static Set<ClusterRequest> createConfigRequestsForNewServices(AddServiceInfo request) {
+    Map<String, Map<String, String>> fullProperties = request.getConfig().getFullProperties();
+    Map<String, Map<String, Map<String, String>>> fullAttributes = request.getConfig().getFullAttributes();
+
+    return createConfigRequestsForServices(
+      request.newServices().keySet(),
+      configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV),
+      request, fullProperties, fullAttributes
+    );
+  }
+
+  private Set<ClusterRequest> createConfigRequestsForExistingServices(AddServiceInfo request) {
+    Cluster cluster = getCluster(request.clusterName());
+    Map<String, Map<String, String>> desiredConfigTags = getDesiredTags(cluster);
+    Configuration mergedConfig = new Configuration(
+      request.getConfig().getProperties(), request.getConfig().getAttributes(),
+      new Configuration(
+        configHelper.getEffectiveConfigProperties(cluster, desiredConfigTags),
+        configHelper.getEffectiveConfigAttributes(cluster, desiredConfigTags)
+      )
+    );
+
+    Set<String> configTypesInRequest = ImmutableSet.copyOf(
+      Sets.difference(
+        Sets.union(
+          request.getConfig().getProperties().keySet(),
+          request.getConfig().getAttributes().keySet()),
+        ImmutableSet.of(ConfigHelper.CLUSTER_ENV))
+    );
+
+    Map<String, Map<String, String>> fullProperties = mergedConfig.getFullProperties();
+    Map<String, Map<String, Map<String, String>>> fullAttributes = mergedConfig.getFullAttributes();
+
+    Set<ClusterRequest> clusterRequests = createConfigRequestsForServices(
+      cluster.getServices().keySet(),
+      configTypesInRequest::contains,
+      request, fullProperties, fullAttributes
+    );
+
+    if (request.getConfig().getProperties().containsKey(ConfigHelper.CLUSTER_ENV)) {
+      Optional<ClusterRequest> clusterEnvRequest = createConfigRequestForConfigTypes(Stream.of(ConfigHelper.CLUSTER_ENV),
+        request, fullProperties, fullAttributes);
+      clusterEnvRequest.ifPresent(clusterRequests::add);
+    }
+
+    return clusterRequests;
+  }
+
+  private static Set<ClusterRequest> createConfigRequestsForServices(
+    Set<String> services,
+    java.util.function.Predicate<String> predicate,
+    AddServiceInfo request,
+    Map<String, Map<String, String>> fullProperties,
+    Map<String, Map<String, Map<String, String>>> fullAttributes
+  ) {
+    return services.stream()
+      .map(service -> createConfigRequestForConfigTypes(
+        request.getStack().getConfigurationTypes(service).stream()
+          .filter(predicate),
+        request, fullProperties, fullAttributes
+      ))
+      .filter(Optional::isPresent)
+      .map(Optional::get)
+      .collect(toSet());
+  }
+
+  /**
+   * Creates a {@link ConfigurationRequest} for each config type in the {@code configTypes} stream.
+   *
+   * @return an {@code Optional} {@link ClusterRequest} with desired configs set to all the {@code ConfigurationRequests},
+   * or an empty {@code Optional} if the incoming {@code configTypes} stream is empty
+   */
+  private static Optional<ClusterRequest> createConfigRequestForConfigTypes(
+    Stream<String> configTypes,
+    AddServiceInfo addServiceRequest,
+    Map<String, Map<String, String>> fullProperties,
+    Map<String, Map<String, Map<String, String>>> fullAttributes
+  ) {
+    List<ConfigurationRequest> configRequests = configTypes
+      .peek(configType -> LOG.info("Creating request for config type {} for {}", configType, addServiceRequest))
+      .map(configType -> new ConfigurationRequest(addServiceRequest.clusterName(), configType, "ADD_SERVICE",
+        fullProperties.getOrDefault(configType, ImmutableMap.of()),
+        fullAttributes.getOrDefault(configType, ImmutableMap.of())))
+      .collect(toList());
+
+    if (configRequests.isEmpty()) {
+      return Optional.empty();
+    }
+
+    ClusterRequest clusterRequest = new ClusterRequest(null, addServiceRequest.clusterName(), null, null);
+    clusterRequest.setDesiredConfig(configRequests);
+    return Optional.of(clusterRequest);
+  }
 
-    return new ConfigurationRequest(request.clusterName(), configType, "ADD_SERVICE",
-      request.getConfig().getProperties().getOrDefault(configType, new HashMap<>(0)),
-      request.getConfig().getAttributes().getOrDefault(configType, new HashMap<>(0)));
+  private Map<String, Map<String, String>> getDesiredTags(Cluster cluster) {
+    try {
+      return configHelper.getEffectiveDesiredTags(cluster, null);
+    } catch (AmbariException e) {
+      String msg = String.format("Error getting tags for desired config of cluster %s", cluster.getClusterName());
+      LOG.error(msg);
+      throw new IllegalStateException(msg, e);
+    }
+  }
+
+  private Cluster getCluster(String clusterName) {
+    try {
+      return controller.getClusters().getCluster(clusterName);
+    } catch (AmbariException e) {
+      String msg = String.format("Cannot find cluster %s", clusterName);
+      LOG.error(msg);
+      throw new IllegalStateException(msg, e);
+    }
   }
 
   private static Predicate predicateForNewServices(AddServiceInfo request, String category) {