You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2018/11/21 16:34:33 UTC
[ambari] branch trunk updated: AMBARI-24926. Apply user-defined
configuration for Add Service request. (#2639)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe80b39 AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639)
fe80b39 is described below
commit fe80b390b03a47bda87dde95bc9feb11a49818a0
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Nov 21 17:34:28 2018 +0100
AMBARI-24926. Apply user-defined configuration for Add Service request. (#2639)
---
.../addservice/AddServiceOrchestrator.java | 5 +-
.../addservice/ResourceProviderAdapter.java | 182 +++++++++++++++++----
2 files changed, 149 insertions(+), 38 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
index e137d4a..d30ab09 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
@@ -113,8 +113,8 @@ public class AddServiceOrchestrator {
throw new IllegalArgumentException("No new services to be added");
}
- Configuration config = stack.getValidDefaultConfig();
- // TODO add user-defined config
+ Configuration config = request.getConfiguration();
+ config.setParentConfiguration(stack.getValidDefaultConfig());
RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager);
AddServiceInfo validatedRequest = new AddServiceInfo(cluster.getClusterName(), stack, config, stages, newServices);
@@ -149,6 +149,7 @@ public class AddServiceOrchestrator {
*/
private void createResources(AddServiceInfo request) {
LOG.info("Creating resources for {}", request);
+ resourceProviders.updateExistingConfigs(request);
resourceProviders.createServices(request);
resourceProviders.createComponents(request);
resourceProviders.createConfigs(request);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
index 722d5f6..ee52c7f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
@@ -20,12 +20,12 @@ package org.apache.ambari.server.topology.addservice;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -56,13 +56,16 @@ import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
import org.apache.ambari.server.security.authorization.AuthorizationException;
+import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.topology.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
/**
* Creates resources using the resource providers.
@@ -76,6 +79,9 @@ public class ResourceProviderAdapter {
@Inject
private AmbariManagementController controller;
+ @Inject
+ private ConfigHelper configHelper;
+
public void createServices(AddServiceInfo request) {
LOG.info("Creating service resources for {}", request);
@@ -83,7 +89,7 @@ public class ResourceProviderAdapter {
.map(service -> createServiceRequestProperties(request, service))
.collect(toSet());
- createResources(properties, Resource.Type.Service);
+ createResources(request, properties, Resource.Type.Service);
}
public void createComponents(AddServiceInfo request) {
@@ -94,7 +100,7 @@ public class ResourceProviderAdapter {
.map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component)))
.collect(toSet());
- createResources(properties, Resource.Type.Component);
+ createResources(request, properties, Resource.Type.Component);
}
public void createHostComponents(AddServiceInfo request) {
@@ -106,30 +112,19 @@ public class ResourceProviderAdapter {
.map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host))))
.collect(toSet());
- createResources(properties, Resource.Type.HostComponent);
+ createResources(request, properties, Resource.Type.HostComponent);
}
public void createConfigs(AddServiceInfo request) {
LOG.info("Creating configurations for {}", request);
+ Set<ClusterRequest> requests = createConfigRequestsForNewServices(request);
+ updateCluster(request, requests, "Error creating configurations for %s");
+ }
- Set<ClusterRequest> requests = new HashSet<>();
- for (String service : request.newServices().keySet()) {
- List<ConfigurationRequest> configRequests = request.getStack().getConfigurationTypes(service).stream()
- .filter(configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV))
- .map(configType -> createClusterConfigRequestProperties(request, service, configType))
- .collect(toList());
- ClusterRequest internalRequest = new ClusterRequest(null, request.clusterName(), null, null);
- internalRequest.setDesiredConfig(configRequests);
- requests.add(internalRequest);
- }
-
- try {
- controller.updateClusters(requests, null);
- } catch (AmbariException | AuthorizationException e) {
- String msg = String.format("Error creating configurations for %s", request);
- LOG.error(msg, e);
- throw new RuntimeException(msg, e);
- }
+ public void updateExistingConfigs(AddServiceInfo request) {
+ LOG.info("Updating existing configurations for {}", request);
+ Set<ClusterRequest> requests = createConfigRequestsForExistingServices(request);
+ updateCluster(request, requests, "Error updating configurations for %s");
}
public void updateServiceDesiredState(AddServiceInfo request, State desiredState) {
@@ -149,7 +144,7 @@ public class ResourceProviderAdapter {
"context", String.format("Put new components to %s state", desiredState)
));
HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
- Request internalRequest = createRequest(request, properties, Resource.Type.HostComponent);
+ Request internalRequest = createRequest(request.clusterName(), properties, Resource.Type.HostComponent);
try {
rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false);
} catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
@@ -159,35 +154,44 @@ public class ResourceProviderAdapter {
}
}
- private static void createResources(Set<Map<String, Object>> properties, Resource.Type resourceType) {
+ private static void createResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) {
Request internalRequest = new RequestImpl(null, properties, null, null);
ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
try {
rp.createResources(internalRequest);
} catch (UnsupportedPropertyException | SystemException | ResourceAlreadyExistsException | NoSuchParentResourceException e) {
- String msg = String.format("Error creating resources: %s", resourceType);
+ String msg = String.format("Error creating resources %s for %s", resourceType, request);
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}
private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) {
- Request internalRequest = createRequest(request, properties, resourceType);
+ Request internalRequest = createRequest(request.clusterName(), properties, resourceType);
ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
try {
rp.updateResources(internalRequest, predicate);
} catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
- String msg = String.format("Error updating resources: %s", resourceType);
+ String msg = String.format("Error updating resources %s for %s", resourceType, request);
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}
- private static Request createRequest(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) {
- Map<String, String> requestInfoProperties = ImmutableMap.of(
+ private void updateCluster(AddServiceInfo addServiceRequest, Set<ClusterRequest> requests, String errorMessageFormat) {
+ try {
+ controller.updateClusters(requests, null);
+ } catch (AmbariException | AuthorizationException e) {
+ String msg = String.format(errorMessageFormat, addServiceRequest);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+ private static Request createRequest(String clusterName, Set<Map<String, Object>> properties, Resource.Type resourceType) {
+ Map<String, String> requestInfoProperties = ImmutableMap.of(
RequestOperationLevel.OPERATION_LEVEL_ID, RequestOperationLevel.getExternalLevelName(resourceType.name()),
- RequestOperationLevel.OPERATION_CLUSTER_ID, request.clusterName()
+ RequestOperationLevel.OPERATION_CLUSTER_ID, clusterName
);
return new RequestImpl(null, properties, requestInfoProperties, null);
}
@@ -225,12 +229,118 @@ public class ResourceProviderAdapter {
return properties.build();
}
- private static ConfigurationRequest createClusterConfigRequestProperties(AddServiceInfo request, String service, String configType) {
- LOG.debug("Creating config type {} for service {}", configType, service);
+ private static Set<ClusterRequest> createConfigRequestsForNewServices(AddServiceInfo request) {
+ Map<String, Map<String, String>> fullProperties = request.getConfig().getFullProperties();
+ Map<String, Map<String, Map<String, String>>> fullAttributes = request.getConfig().getFullAttributes();
+
+ return createConfigRequestsForServices(
+ request.newServices().keySet(),
+ configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV),
+ request, fullProperties, fullAttributes
+ );
+ }
+
+ private Set<ClusterRequest> createConfigRequestsForExistingServices(AddServiceInfo request) {
+ Cluster cluster = getCluster(request.clusterName());
+ Map<String, Map<String, String>> desiredConfigTags = getDesiredTags(cluster);
+ Configuration mergedConfig = new Configuration(
+ request.getConfig().getProperties(), request.getConfig().getAttributes(),
+ new Configuration(
+ configHelper.getEffectiveConfigProperties(cluster, desiredConfigTags),
+ configHelper.getEffectiveConfigAttributes(cluster, desiredConfigTags)
+ )
+ );
+
+ Set<String> configTypesInRequest = ImmutableSet.copyOf(
+ Sets.difference(
+ Sets.union(
+ request.getConfig().getProperties().keySet(),
+ request.getConfig().getAttributes().keySet()),
+ ImmutableSet.of(ConfigHelper.CLUSTER_ENV))
+ );
+
+ Map<String, Map<String, String>> fullProperties = mergedConfig.getFullProperties();
+ Map<String, Map<String, Map<String, String>>> fullAttributes = mergedConfig.getFullAttributes();
+
+ Set<ClusterRequest> clusterRequests = createConfigRequestsForServices(
+ cluster.getServices().keySet(),
+ configTypesInRequest::contains,
+ request, fullProperties, fullAttributes
+ );
+
+ if (request.getConfig().getProperties().containsKey(ConfigHelper.CLUSTER_ENV)) {
+ Optional<ClusterRequest> clusterEnvRequest = createConfigRequestForConfigTypes(Stream.of(ConfigHelper.CLUSTER_ENV),
+ request, fullProperties, fullAttributes);
+ clusterEnvRequest.ifPresent(clusterRequests::add);
+ }
+
+ return clusterRequests;
+ }
+
+ private static Set<ClusterRequest> createConfigRequestsForServices(
+ Set<String> services,
+ java.util.function.Predicate<String> predicate,
+ AddServiceInfo request,
+ Map<String, Map<String, String>> fullProperties,
+ Map<String, Map<String, Map<String, String>>> fullAttributes
+ ) {
+ return services.stream()
+ .map(service -> createConfigRequestForConfigTypes(
+ request.getStack().getConfigurationTypes(service).stream()
+ .filter(predicate),
+ request, fullProperties, fullAttributes
+ ))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(toSet());
+ }
+
+ /**
+ * Creates a {@link ConfigurationRequest} for each config type in the {@code configTypes} stream.
+ *
+ * @return an {@code Optional} {@link ClusterRequest} with desired configs set to all the {@code ConfigurationRequests},
+ * or an empty {@code Optional} if the incoming {@code configTypes} stream is empty
+ */
+ private static Optional<ClusterRequest> createConfigRequestForConfigTypes(
+ Stream<String> configTypes,
+ AddServiceInfo addServiceRequest,
+ Map<String, Map<String, String>> fullProperties,
+ Map<String, Map<String, Map<String, String>>> fullAttributes
+ ) {
+ List<ConfigurationRequest> configRequests = configTypes
+ .peek(configType -> LOG.info("Creating request for config type {} for {}", configType, addServiceRequest))
+ .map(configType -> new ConfigurationRequest(addServiceRequest.clusterName(), configType, "ADD_SERVICE",
+ fullProperties.getOrDefault(configType, ImmutableMap.of()),
+ fullAttributes.getOrDefault(configType, ImmutableMap.of())))
+ .collect(toList());
+
+ if (configRequests.isEmpty()) {
+ return Optional.empty();
+ }
+
+ ClusterRequest clusterRequest = new ClusterRequest(null, addServiceRequest.clusterName(), null, null);
+ clusterRequest.setDesiredConfig(configRequests);
+ return Optional.of(clusterRequest);
+ }
- return new ConfigurationRequest(request.clusterName(), configType, "ADD_SERVICE",
- request.getConfig().getProperties().getOrDefault(configType, new HashMap<>(0)),
- request.getConfig().getAttributes().getOrDefault(configType, new HashMap<>(0)));
+ private Map<String, Map<String, String>> getDesiredTags(Cluster cluster) {
+ try {
+ return configHelper.getEffectiveDesiredTags(cluster, null);
+ } catch (AmbariException e) {
+ String msg = String.format("Error getting tags for desired config of cluster %s", cluster.getClusterName());
+ LOG.error(msg);
+ throw new IllegalStateException(msg, e);
+ }
+ }
+
+ private Cluster getCluster(String clusterName) {
+ try {
+ return controller.getClusters().getCluster(clusterName);
+ } catch (AmbariException e) {
+ String msg = String.format("Cannot find cluster %s", clusterName);
+ LOG.error(msg);
+ throw new IllegalStateException(msg, e);
+ }
}
private static Predicate predicateForNewServices(AddServiceInfo request, String category) {