You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2018/11/20 07:14:50 UTC
[ambari] branch trunk updated: AMBARI-24917. Implement complex Add
Service request using default configs (#2631)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94d1860 AMBARI-24917. Implement complex Add Service request using default configs (#2631)
94d1860 is described below
commit 94d18601e8adef6bd43d2ce1e70d3c1e4f14ae3c
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Nov 20 08:14:45 2018 +0100
AMBARI-24917. Implement complex Add Service request using default configs (#2631)
---
.../ambari/server/api/handlers/CreateHandler.java | 2 +-
.../internal/HostComponentResourceProvider.java | 2 +-
.../internal/ServiceResourceProvider.java | 8 +-
.../ambari/server/controller/internal/Stack.java | 24 +++
.../server/controller/internal/UnitUpdater.java | 23 ++-
.../server/topology/addservice/AddServiceInfo.java | 46 +++++-
.../addservice/AddServiceOrchestrator.java | 61 ++++++-
.../addservice/ResourceProviderAdapter.java | 183 +++++++++++++++++++--
8 files changed, 312 insertions(+), 37 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java
index 484547a..b5dddf8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java
@@ -77,7 +77,7 @@ public class CreateHandler extends BaseManagementHandler {
} catch (ResourceAlreadyExistsException e) {
result = new ResultImpl(new ResultStatus(ResultStatus.STATUS.CONFLICT, e.getMessage()));
} catch(IllegalArgumentException e) {
- LOG.error("Bad request received: " + e.getMessage());
+ LOG.error("Bad request received: " + e.getMessage(), e);
result = new ResultImpl(new ResultStatus(ResultStatus.STATUS.BAD_REQUEST, e.getMessage()));
} catch (RuntimeException e) {
if (LOG.isErrorEnabled()) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
index d6f52a4..79441d5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
@@ -799,7 +799,7 @@ public class HostComponentResourceProvider extends AbstractControllerResourcePro
* @throws NoSuchResourceException the query didn't match any resources
* @throws NoSuchParentResourceException a specified parent resource doesn't exist
*/
- private RequestStageContainer doUpdateResources(final RequestStageContainer stages, final Request request,
+ public RequestStageContainer doUpdateResources(final RequestStageContainer stages, final Request request,
Predicate predicate, boolean performQueryEvaluation,
boolean useGeneratedConfigs, boolean useClusterHostInfo)
throws UnsupportedPropertyException,
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
index a4aaf0b..eaa8cb1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
@@ -254,9 +254,9 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
if (request.getProperties().size() == 1) {
Map<String, Object> requestProperties = request.getProperties().iterator().next();
if (isAddServiceRequest(requestProperties)) {
- processAddServiceRequest(requestProperties, request.getRequestInfoProperties());
+ RequestStatusResponse response = processAddServiceRequest(requestProperties, request.getRequestInfoProperties());
notifyCreate(Resource.Type.Service, request);
- return getRequestStatus(null);
+ return getRequestStatus(response);
}
}
@@ -1228,11 +1228,11 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
return OperationType.ADD_SERVICE.name().equals(properties.get(OPERATION_TYPE));
}
- private void processAddServiceRequest(Map<String, Object> requestProperties, Map<String, String> requestInfoProperties) throws NoSuchParentResourceException {
+ private RequestStatusResponse processAddServiceRequest(Map<String, Object> requestProperties, Map<String, String> requestInfoProperties) throws NoSuchParentResourceException {
AddServiceRequest request = createAddServiceRequest(requestProperties, requestInfoProperties);
String clusterName = String.valueOf(requestProperties.get(SERVICE_CLUSTER_NAME_PROPERTY_ID));
try {
- addServiceOrchestrator.processAddServiceRequest(getManagementController().getClusters().getCluster(clusterName), request);
+ return addServiceOrchestrator.processAddServiceRequest(getManagementController().getClusters().getCluster(clusterName), request);
} catch (AmbariException e) {
throw new NoSuchParentResourceException(e.getMessage(), e);
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
index 1c85d88..02e8b4e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -48,6 +49,7 @@ import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.ValueAttributesInfo;
import org.apache.ambari.server.topology.Cardinality;
import org.apache.ambari.server.topology.Configuration;
+import org.apache.ambari.server.topology.validators.UnitValidatedProperty;
/**
* Encapsulates stack information.
@@ -643,6 +645,28 @@ public class Stack {
}
/**
+ * @return default configuration of the stack, with some updates necessary so that the config can be applied
+ * (eg. some properties need a unit to be appended)
+ */
+ public Configuration getValidDefaultConfig() {
+ Configuration config = getConfiguration();
+
+ for (UnitValidatedProperty p : UnitValidatedProperty.ALL) {
+ if (config.isPropertySet(p.getConfigType(), p.getPropertyName())) {
+ String value = config.getPropertyValue(p.getConfigType(), p.getPropertyName());
+ String updatedValue = UnitUpdater.updateForClusterCreate(this, p.getServiceName(), p.getConfigType(), p.getPropertyName(), value);
+ config.setProperty(p.getConfigType(), p.getPropertyName(), updatedValue);
+ }
+ }
+
+ config.getProperties().values().forEach(
+ each -> each.values().removeIf(Objects::isNull)
+ );
+
+ return config;
+ }
+
+ /**
* Parse components for the specified service from the stack definition.
*
* @param service service name
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java
index 501e16a..6691519 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java
@@ -49,15 +49,20 @@ public class UnitUpdater implements BlueprintConfigurationProcessor.PropertyUpda
String origValue,
Map<String, Map<String, String>> properties,
ClusterTopology topology) {
- PropertyUnit stackUnit = PropertyUnit.of(topology.getBlueprint().getStack(), serviceName, configType, propertyName);
- PropertyValue value = PropertyValue.of(propertyName, origValue);
- if (value.hasUnit(stackUnit)) {
- return value.toString();
- } else if (!value.hasAnyUnit()) {
- return value.withUnit(stackUnit);
- } else { // should not happen because of pre-validation in UnitValidator
- throw new IllegalArgumentException("Property " + propertyName + "=" + origValue + " has an unsupported unit. Stack supported unit is: " + stackUnit + " or no unit");
- }
+ Stack stack = topology.getBlueprint().getStack();
+ return updateForClusterCreate(stack, serviceName, configType, propertyName, origValue);
+ }
+
+ public static String updateForClusterCreate(Stack stack, String serviceName, String configType, String propertyName, String origValue) {
+ PropertyUnit stackUnit = PropertyUnit.of(stack, serviceName, configType, propertyName);
+ PropertyValue value = PropertyValue.of(propertyName, origValue);
+ if (value.hasUnit(stackUnit)) {
+ return value.toString();
+ } else if (!value.hasAnyUnit()) {
+ return value.withUnit(stackUnit);
+ } else { // should not happen because of pre-validation in UnitValidator
+ throw new IllegalArgumentException("Property " + propertyName + "=" + origValue + " has an unsupported unit. Stack supported unit is: " + stackUnit + " or no unit");
+ }
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
index 24e530d..8662746 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java
@@ -17,26 +17,47 @@
*/
package org.apache.ambari.server.topology.addservice;
+import static java.util.stream.Collectors.joining;
+
import java.util.Map;
import java.util.Set;
+import org.apache.ambari.server.controller.internal.RequestStageContainer;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.topology.Configuration;
+
/**
* Processed info for adding new services/components to an existing cluster.
*/
public final class AddServiceInfo {
private final String clusterName;
+ private final Stack stack;
private final Map<String, Map<String, Set<String>>> newServices;
+ private final RequestStageContainer stages;
+ private final Configuration config;
- public AddServiceInfo(String clusterName, Map<String, Map<String, Set<String>>> newServices) {
+ public AddServiceInfo(String clusterName, Stack stack, Configuration config, RequestStageContainer stages, Map<String, Map<String, Set<String>>> newServices) {
this.clusterName = clusterName;
+ this.stack = stack;
this.newServices = newServices;
+ this.stages = stages;
+ this.config = config;
+ }
+
+ @Override
+ public String toString() {
+ return "AddServiceRequest(" + stages.getId() + ")";
}
public String clusterName() {
return clusterName;
}
+ public RequestStageContainer getStages() {
+ return stages;
+ }
+
/**
* New services to be added to the cluster: service -> component -> host
* This should include both explicitly requested services, and services of the requested components.
@@ -44,4 +65,27 @@ public final class AddServiceInfo {
public Map<String, Map<String, Set<String>>> newServices() {
return newServices;
}
+
+ public Stack getStack() {
+ return stack;
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ /**
+ * Creates a descriptive label to be displayed in the UI.
+ */
+ public String describe() {
+ int maxServicesToShow = 3;
+ StringBuilder sb = new StringBuilder("Add Services: ")
+ .append(newServices.keySet().stream().sorted().limit(maxServicesToShow).collect(joining(", ")));
+ if (newServices.size() > maxServicesToShow) {
+ sb.append(" and ").append(newServices.size() - maxServicesToShow).append(" more");
+ }
+ sb.append(" to cluster ").append(clusterName);
+ return sb.toString();
+ }
+
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
index 426c833..e137d4a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java
@@ -27,12 +27,17 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.RequestFactory;
import org.apache.ambari.server.controller.AddServiceRequest;
import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.internal.RequestStageContainer;
import org.apache.ambari.server.controller.internal.Stack;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.topology.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,13 +52,23 @@ public class AddServiceOrchestrator {
@Inject
private AmbariManagementController controller;
- public void processAddServiceRequest(Cluster cluster, AddServiceRequest request) {
+ @Inject
+ private ActionManager actionManager;
+
+ @Inject
+ private RequestFactory requestFactory;
+
+ public RequestStatusResponse processAddServiceRequest(Cluster cluster, AddServiceRequest request) {
LOG.info("Received {} request for {}: {}", request.getOperationType(), cluster.getClusterName(), request);
AddServiceInfo validatedRequest = validate(cluster, request);
AddServiceInfo requestWithLayout = recommendLayout(validatedRequest);
- createResources(requestWithLayout);
- createHostTasks(requestWithLayout);
+ AddServiceInfo requestWithConfig = recommendConfiguration(requestWithLayout);
+
+ createResources(requestWithConfig);
+ createHostTasks(requestWithConfig);
+
+ return requestWithConfig.getStages().getRequestStatusResponse();
}
/**
@@ -68,8 +83,9 @@ public class AddServiceOrchestrator {
Map<String, Map<String, Set<String>>> newServices = new LinkedHashMap<>();
StackId stackId = new StackId(request.getStackName(), request.getStackVersion());
+ Stack stack;
try {
- Stack stack = new Stack(stackId, controller);
+ stack = new Stack(stackId, controller);
Set<String> existingServices = cluster.getServices().keySet();
for (AddServiceRequest.Component requestedComponent : request.getComponents()) {
String serviceName = stack.getServiceForComponent(requestedComponent.getName());
@@ -93,7 +109,17 @@ public class AddServiceOrchestrator {
throw new IllegalArgumentException(e);
}
- return new AddServiceInfo(cluster.getClusterName(), newServices);
+ if (newServices.isEmpty()) {
+ throw new IllegalArgumentException("No new services to be added");
+ }
+
+ Configuration config = stack.getValidDefaultConfig();
+ // TODO add user-defined config
+
+ RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager);
+ AddServiceInfo validatedRequest = new AddServiceInfo(cluster.getClusterName(), stack, config, stages, newServices);
+ stages.setRequestContext(validatedRequest.describe());
+ return validatedRequest;
}
/**
@@ -108,20 +134,41 @@ public class AddServiceOrchestrator {
}
/**
+ * Requests config recommendation from the stack advisor.
+ * @return new request, updated with the recommended config
+ * @throws IllegalArgumentException if the request cannot be satisfied
+ */
+ private AddServiceInfo recommendConfiguration(AddServiceInfo request) {
+ LOG.info("Recommending configuration for {}", request);
+ // TODO implement
+ return request;
+ }
+
+ /**
* Creates the service, component and host component resources for the request.
*/
private void createResources(AddServiceInfo request) {
LOG.info("Creating resources for {}", request);
resourceProviders.createServices(request);
resourceProviders.createComponents(request);
- resourceProviders.createHostComponents(request);
+ resourceProviders.createConfigs(request);
resourceProviders.updateServiceDesiredState(request, State.INSTALLED);
resourceProviders.updateServiceDesiredState(request, State.STARTED);
+ resourceProviders.createHostComponents(request);
}
private void createHostTasks(AddServiceInfo request) {
LOG.info("Creating host tasks for {}", request);
- // TODO implement
+
+ resourceProviders.updateHostComponentDesiredState(request, State.INSTALLED);
+ resourceProviders.updateHostComponentDesiredState(request, State.STARTED);
+ try {
+ request.getStages().persist();
+ } catch (AmbariException e) {
+ String msg = String.format("Error creating host tasks for %s", request);
+ LOG.error(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
index 70b730e..722d5f6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java
@@ -17,15 +17,36 @@
*/
package org.apache.ambari.server.topology.addservice;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.ClusterRequest;
+import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.internal.ClusterResourceProvider;
+import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
+import org.apache.ambari.server.controller.internal.HostComponentResourceProvider;
import org.apache.ambari.server.controller.internal.RequestImpl;
+import org.apache.ambari.server.controller.internal.RequestOperationLevel;
import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
+import org.apache.ambari.server.controller.predicate.AndPredicate;
+import org.apache.ambari.server.controller.predicate.EqualsPredicate;
+import org.apache.ambari.server.controller.predicate.OrPredicate;
import org.apache.ambari.server.controller.spi.ClusterController;
import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
@@ -33,20 +54,28 @@ import org.apache.ambari.server.controller.spi.ResourceProvider;
import org.apache.ambari.server.controller.spi.SystemException;
import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.security.authorization.AuthorizationException;
+import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
/**
* Creates resources using the resource providers.
* Translates {@link AddServiceInfo} to internal requests accepted by those.
*/
+@Singleton
public class ResourceProviderAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ResourceProviderAdapter.class);
+ @Inject
+ private AmbariManagementController controller;
+
public void createServices(AddServiceInfo request) {
LOG.info("Creating service resources for {}", request);
@@ -54,16 +83,115 @@ public class ResourceProviderAdapter {
.map(service -> createServiceRequestProperties(request, service))
.collect(toSet());
+ createResources(properties, Resource.Type.Service);
+ }
+
+ public void createComponents(AddServiceInfo request) {
+ LOG.info("Creating component resources for {}", request);
+
+ Set<Map<String, Object>> properties = request.newServices().entrySet().stream()
+ .flatMap(componentsOfService -> componentsOfService.getValue().keySet().stream()
+ .map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component)))
+ .collect(toSet());
+
+ createResources(properties, Resource.Type.Component);
+ }
+
+ public void createHostComponents(AddServiceInfo request) {
+ LOG.info("Creating host component resources for {}", request);
+
+ Set<Map<String, Object>> properties = request.newServices().entrySet().stream()
+ .flatMap(componentsOfService -> componentsOfService.getValue().entrySet().stream()
+ .flatMap(hostsOfComponent -> hostsOfComponent.getValue().stream()
+ .map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host))))
+ .collect(toSet());
+
+ createResources(properties, Resource.Type.HostComponent);
+ }
+
+ public void createConfigs(AddServiceInfo request) {
+ LOG.info("Creating configurations for {}", request);
+
+ Set<ClusterRequest> requests = new HashSet<>();
+ for (String service : request.newServices().keySet()) {
+ List<ConfigurationRequest> configRequests = request.getStack().getConfigurationTypes(service).stream()
+ .filter(configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV))
+ .map(configType -> createClusterConfigRequestProperties(request, service, configType))
+ .collect(toList());
+ ClusterRequest internalRequest = new ClusterRequest(null, request.clusterName(), null, null);
+ internalRequest.setDesiredConfig(configRequests);
+ requests.add(internalRequest);
+ }
+
+ try {
+ controller.updateClusters(requests, null);
+ } catch (AmbariException | AuthorizationException e) {
+ String msg = String.format("Error creating configurations for %s", request);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ public void updateServiceDesiredState(AddServiceInfo request, State desiredState) {
+ LOG.info("Updating service desired state to {} for {}", desiredState, request);
+
+ Set<Map<String, Object>> properties = ImmutableSet.of(ImmutableMap.of(
+ ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, desiredState.name()
+ ));
+ updateResources(request, properties, Resource.Type.Service, predicateForNewServices(request, "ServiceInfo"));
+ }
+
+ public void updateHostComponentDesiredState(AddServiceInfo request, State desiredState) {
+ LOG.info("Updating host component desired state to {} for {}", desiredState, request);
+
+ Set<Map<String, Object>> properties = ImmutableSet.of(ImmutableMap.of(
+ HostComponentResourceProvider.STATE, desiredState.name(),
+ "context", String.format("Put new components to %s state", desiredState)
+ ));
+ HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
+ Request internalRequest = createRequest(request, properties, Resource.Type.HostComponent);
+ try {
+ rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false);
+ } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
+ String msg = String.format("Error updating host component desired state for %s", request);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private static void createResources(Set<Map<String, Object>> properties, Resource.Type resourceType) {
Request internalRequest = new RequestImpl(null, properties, null, null);
- ResourceProvider rp = getClusterController().ensureResourceProvider(Resource.Type.Service);
+ ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
try {
rp.createResources(internalRequest);
} catch (UnsupportedPropertyException | SystemException | ResourceAlreadyExistsException | NoSuchParentResourceException e) {
- LOG.error("Error creating services", e);
- throw new RuntimeException("Error creating services", e);
+ String msg = String.format("Error creating resources: %s", resourceType);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) {
+ Request internalRequest = createRequest(request, properties, resourceType);
+ ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType);
+ try {
+ rp.updateResources(internalRequest, predicate);
+ } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) {
+ String msg = String.format("Error updating resources: %s", resourceType);
+ LOG.error(msg, e);
+ throw new RuntimeException(msg, e);
}
}
+ private static Request createRequest(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) {
+ Map<String, String> requestInfoProperties = ImmutableMap.of(
+
+ RequestOperationLevel.OPERATION_LEVEL_ID, RequestOperationLevel.getExternalLevelName(resourceType.name()),
+ RequestOperationLevel.OPERATION_CLUSTER_ID, request.clusterName()
+ );
+ return new RequestImpl(null, properties, requestInfoProperties, null);
+ }
+
private static Map<String, Object> createServiceRequestProperties(AddServiceInfo request, String service) {
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
@@ -74,22 +202,49 @@ public class ResourceProviderAdapter {
return properties.build();
}
- private ClusterController getClusterController() {
- return ClusterControllerHelper.getClusterController();
+ private static Map<String, Object> createComponentRequestProperties(AddServiceInfo request, String service, String component) {
+ ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
+
+ properties.put(ComponentResourceProvider.CLUSTER_NAME, request.clusterName());
+ properties.put(ComponentResourceProvider.SERVICE_NAME, service);
+ properties.put(ComponentResourceProvider.COMPONENT_NAME, component);
+ properties.put(ComponentResourceProvider.STATE, State.INIT.name());
+
+ return properties.build();
}
- public void createComponents(AddServiceInfo request) {
- LOG.info("Creating component resources for {}", request);
- // TODO implement
+ private static Map<String, Object> createHostComponentRequestProperties(AddServiceInfo request, String service, String component, String host) {
+ ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
+
+ properties.put(HostComponentResourceProvider.CLUSTER_NAME, request.clusterName());
+ properties.put(HostComponentResourceProvider.SERVICE_NAME, service);
+ properties.put(HostComponentResourceProvider.COMPONENT_NAME, component);
+ properties.put(HostComponentResourceProvider.HOST_NAME, host);
+ properties.put(HostComponentResourceProvider.STATE, State.INIT.name());
+
+ return properties.build();
}
- public void createHostComponents(AddServiceInfo request) {
- LOG.info("Creating host component resources for {}", request);
- // TODO implement
+ private static ConfigurationRequest createClusterConfigRequestProperties(AddServiceInfo request, String service, String configType) {
+ LOG.debug("Creating config type {} for service {}", configType, service);
+
+ return new ConfigurationRequest(request.clusterName(), configType, "ADD_SERVICE",
+ request.getConfig().getProperties().getOrDefault(configType, new HashMap<>(0)),
+ request.getConfig().getAttributes().getOrDefault(configType, new HashMap<>(0)));
}
- public void updateServiceDesiredState(AddServiceInfo request, State desiredState) {
- LOG.info("Updating service desired state to {} for {}", desiredState, request);
- // TODO implement, reuse parts of AmbariContext#createAmbariServiceAndComponentResources
+ private static Predicate predicateForNewServices(AddServiceInfo request, String category) {
+ return new AndPredicate(
+ new EqualsPredicate<>(PropertyHelper.getPropertyId(category, ClusterResourceProvider.CLUSTER_NAME), request.clusterName()),
+ new OrPredicate(
+ request.newServices().keySet().stream()
+ .map(service -> new EqualsPredicate<>(PropertyHelper.getPropertyId(category, "service_name"), service))
+ .toArray(Predicate[]::new)
+ )
+ );
+ }
+
+ private static ClusterController getClusterController() {
+ return ClusterControllerHelper.getClusterController();
}
}