You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by js...@apache.org on 2015/04/27 07:53:01 UTC
[07/13] ambari git commit: AMBARI-10750. Initial merge of advanced
api provisioning work.
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
new file mode 100644
index 0000000..9f9db5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -0,0 +1,814 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.ConfigGroupRequest;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ServiceComponentHostRequest;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider;
+import org.apache.ambari.server.controller.internal.HostComponentResourceProvider;
+import org.apache.ambari.server.controller.internal.HostResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestImpl;
+import org.apache.ambari.server.controller.internal.ResourceImpl;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ConfigImpl;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.controller.AmbariServer.getController;
+
+/**
+ * Represents a set of requests to a single host such as install, start, etc.
+ */
+public class HostRequest implements Comparable<HostRequest> {
+
+ private long requestId;
+ private String blueprint;
+ private HostGroup hostGroup;
+ private String hostgroupName;
+ private Predicate predicate;
+ private int cardinality = -1;
+ private String hostname = null;
+ private String cluster;
+ private boolean containsMaster;
+ private long stageId = -1;
+ //todo: should be able to use the presence of hostName for this
+ private boolean outstanding = true;
+
+ //todo: remove
+ private Map<String, Long> logicalInstallTaskIds = new HashMap<String, Long>();
+ //todo: remove
+ private Map<String, Long> logicalStartTaskIds = new HashMap<String, Long>();
+
+ Collection<HostRoleCommand> logicalTasks = new ArrayList<HostRoleCommand>();
+
+ // logical task id -> physical tasks
+ private Map<Long, Collection<Long>> physicalTasks = new HashMap<Long, Collection<Long>>();
+
+ private static HostResourceProvider hostResourceProvider;
+
+ private HostComponentResourceProvider hostComponentResourceProvider;
+
+ private AmbariManagementController controller = getController();
+ private ActionManager actionManager = controller.getActionManager();
+ private ConfigHelper configHelper = controller.getConfigHelper();
+ private AmbariMetaInfo metaInfoManager = controller.getAmbariMetaInfo();
+
+ //todo: temporary refactoring step
+ private TopologyManager.ClusterTopologyContext topologyContext;
+
+ private static HostRoleCommandFactory hostRoleCommandFactory;
+
+ public static void init(HostRoleCommandFactory factory) {
+ hostRoleCommandFactory = factory;
+ }
+
+ public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
+ int cardinality, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+ this.requestId = requestId;
+ this.stageId = stageId;
+ this.cluster = cluster;
+ this.blueprint = blueprintName;
+ this.hostGroup = hostGroup;
+ this.hostgroupName = hostGroup.getName();
+ this.cardinality = cardinality;
+ this.predicate = predicate;
+ this.containsMaster = hostGroup.containsMasterComponent();
+ this.topologyContext = topologyContext;
+
+ createTasks();
+ System.out.println("HostRequest: Created request: Host Association Pending");
+ }
+
+ public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
+ String hostname, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+ this.requestId = requestId;
+ this.stageId = stageId;
+ this.cluster = cluster;
+ this.blueprint = blueprintName;
+ this.hostGroup = hostGroup;
+ this.hostgroupName = hostGroup.getName();
+ this.hostname = hostname;
+ this.predicate = predicate;
+ this.containsMaster = hostGroup.containsMasterComponent();
+ this.topologyContext = topologyContext;
+
+ createTasks();
+ System.out.println("HostRequest: Created request for host: " + hostname);
+ }
+
+ //todo: synchronization
+ public synchronized HostOfferResponse offer(HostImpl host) {
+ if (! outstanding) {
+ return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+ }
+ if (matchesHost(host)) {
+ outstanding = false;
+ hostname = host.getHostName();
+ List<TopologyTask> tasks = provision(host);
+
+ return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, hostGroup.getName(), tasks);
+ } else {
+ return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE);
+ }
+ }
+
+ public void setHostName(String hostName) {
+ this.hostname = hostName;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getClusterName() {
+ return cluster;
+ }
+ public String getBlueprint() {
+ return blueprint;
+ }
+
+ public HostGroup getHostGroup() {
+ return hostGroup;
+ }
+
+ public String getHostgroupName() {
+ return hostgroupName;
+ }
+
+ public int getCardinality() {
+ return cardinality;
+ }
+
+ public Predicate getPredicate() {
+ return predicate;
+ }
+
+
+ private List<TopologyTask> provision(HostImpl host) {
+ List<TopologyTask> tasks = new ArrayList<TopologyTask>();
+
+ tasks.add(new CreateHostResourcesTask(topologyContext.getClusterTopology(), host, getHostgroupName()));
+ setHostOnTasks(host);
+
+ HostGroup hostGroup = getHostGroup();
+ tasks.add(new ConfigureConfigGroup(getConfigurationGroupName(hostGroup.getBlueprintName(),
+ hostGroup.getName()), getClusterName(), hostname));
+
+ tasks.add(getInstallTask());
+ tasks.add(getStartTask());
+
+ return tasks;
+ }
+
+ private void createTasks() {
+ HostGroup hostGroup = getHostGroup();
+ for (String component : hostGroup.getComponents()) {
+ if (component == null || component.equals("AMBARI_SERVER")) {
+ System.out.printf("Skipping component %s when creating request\n", component);
+ continue;
+ }
+
+ String hostName = getHostName() != null ?
+ getHostName() :
+ "PENDING HOST ASSIGNMENT : HOSTGROUP=" + getHostgroupName();
+
+ HostRoleCommand installTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.INSTALL);
+ installTask.setStatus(HostRoleStatus.PENDING);
+ installTask.setTaskId(topologyContext.getNextTaskId());
+ installTask.setRequestId(getRequestId());
+ installTask.setStageId(stageId);
+
+ //todo: had to add requestId to ShortTaskStatus
+ //todo: revert addition of requestId when we are using LogicalTask
+ installTask.setRequestId(getRequestId());
+
+ logicalTasks.add(installTask);
+ registerLogicalInstallTaskId(component, installTask.getTaskId());
+
+ Stack stack = hostGroup.getStack();
+ try {
+ // if component isn't a client, add a start task
+ if (! metaInfoManager.getComponent(stack.getName(), stack.getVersion(), stack.getServiceForComponent(component), component).isClient()) {
+ HostRoleCommand startTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.START);
+ startTask.setStatus(HostRoleStatus.PENDING);
+ startTask.setRequestId(getRequestId());
+ startTask.setTaskId(topologyContext.getNextTaskId());
+ startTask.setRequestId(getRequestId());
+ startTask.setStageId(stageId);
+ logicalTasks.add(startTask);
+ registerLogicalStartTaskId(component, startTask.getTaskId());
+ }
+ } catch (AmbariException e) {
+ e.printStackTrace();
+ //todo: how to handle
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Get a config group name based on a bp and host group.
+ *
+ * @param bpName blueprint name
+ * @param hostGroupName host group name
+ * @return config group name
+ */
+ protected String getConfigurationGroupName(String bpName, String hostGroupName) {
+ return String.format("%s:%s", bpName, hostGroupName);
+ }
+
+ private void setHostOnTasks(HostImpl host) {
+ for (HostRoleCommand task : getTasks()) {
+ task.setHostEntity(host.getHostEntity());
+ }
+ }
+
+ //todo: analyze all all configuration needs for dealing with deprecated properties
+ /**
+ * Since global configs are deprecated since 1.7.0, but still supported.
+ * We should automatically map any globals used, to *-env dictionaries.
+ *
+ * @param blueprintConfigurations map of blueprint configurations keyed by type
+ */
+ private void handleGlobalsBackwardsCompability(Stack stack,
+ Map<String, Map<String, String>> blueprintConfigurations) {
+
+ StackId stackId = new StackId(stack.getName(), stack.getVersion());
+ configHelper.moveDeprecatedGlobals(stackId, blueprintConfigurations, getClusterName());
+ }
+
+ public Collection<HostRoleCommand> getTasks() {
+ // sync logical task state with physical tasks
+ for (HostRoleCommand logicalTask : logicalTasks) {
+ Collection<Long> physicalTaskIds = physicalTasks.get(logicalTask.getTaskId());
+ if (physicalTaskIds != null) {
+ //todo: for now only one physical task per logical task
+ long physicalTaskId = physicalTaskIds.iterator().next();
+ HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+ if (physicalTask != null) {
+ logicalTask.setStatus(physicalTask.getStatus());
+ logicalTask.setCommandDetail(physicalTask.getCommandDetail());
+ logicalTask.setCustomCommandName(physicalTask.getCustomCommandName());
+ //todo: once we retry on failures, start/end times could span multiple physical tasks
+ logicalTask.setStartTime(physicalTask.getStartTime());
+ logicalTask.setEndTime(physicalTask.getEndTime());
+ logicalTask.setErrorLog(physicalTask.getErrorLog());
+ logicalTask.setExitCode(physicalTask.getExitCode());
+ logicalTask.setExecutionCommandWrapper(physicalTask.getExecutionCommandWrapper());
+ //todo: may be handled at a higher level than physical task
+ logicalTask.setLastAttemptTime(physicalTask.getLastAttemptTime());
+ logicalTask.setOutputLog(physicalTask.getOutputLog());
+ logicalTask.setStderr(physicalTask.getStderr());
+ logicalTask.setStdout(physicalTask.getStdout());
+ logicalTask.setStructuredOut(physicalTask.getStructuredOut());
+ }
+ }
+ }
+ return logicalTasks;
+ }
+
+ public Collection<HostRoleCommandEntity> getTaskEntities() {
+ Collection<HostRoleCommandEntity> taskEntities = new ArrayList<HostRoleCommandEntity>();
+ for (HostRoleCommand task : logicalTasks) {
+ HostRoleCommandEntity entity = task.constructNewPersistenceEntity();
+ // the above method doesn't set all of the fields for some unknown reason
+ entity.setRequestId(task.getRequestId());
+ entity.setStageId(task.getStageId());
+ entity.setTaskId(task.getTaskId());
+ entity.setOutputLog(task.getOutputLog());
+ entity.setErrorLog(task.errorLog);
+
+ // set state from physical task
+ Collection<Long> physicalTaskIds = physicalTasks.get(task.getTaskId());
+ if (physicalTaskIds != null) {
+ //todo: for now only one physical task per logical task
+ long physicalTaskId = physicalTaskIds.iterator().next();
+ HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+ if (physicalTask != null) {
+ entity.setStatus(physicalTask.getStatus());
+ entity.setCommandDetail(physicalTask.getCommandDetail());
+ entity.setCustomCommandName(physicalTask.getCustomCommandName());
+ //todo: once we retry on failures, start/end times could span multiple physical tasks
+ entity.setStartTime(physicalTask.getStartTime());
+ entity.setEndTime(physicalTask.getEndTime());
+ entity.setErrorLog(physicalTask.getErrorLog());
+ entity.setExitcode(physicalTask.getExitCode());
+ //todo: may be handled at a higher level than physical task
+ entity.setLastAttemptTime(physicalTask.getLastAttemptTime());
+ entity.setOutputLog(physicalTask.getOutputLog());
+ entity.setStdError(physicalTask.getStderr().getBytes());
+ entity.setStdOut(physicalTask.getStdout().getBytes());
+ entity.setStructuredOut(physicalTask.getStructuredOut().getBytes());
+ }
+ }
+
+ taskEntities.add(entity);
+ }
+ return taskEntities;
+ }
+
+ public boolean containsMaster() {
+ return containsMaster;
+ }
+
+ public boolean matchesHost(HostImpl host) {
+ if (hostname != null) {
+ return host.getHostName().equals(hostname);
+ } else if (predicate != null) {
+ return predicate.evaluate(new HostResourceAdapter(host));
+ } else {
+ return true;
+ }
+ }
+
+ public String getHostName() {
+ return hostname;
+ }
+
+ public long getStageId() {
+ return stageId;
+ }
+
+ //todo: remove
+ private void registerLogicalInstallTaskId(String component, long taskId) {
+ logicalInstallTaskIds.put(component, taskId);
+ }
+
+ //todo: remove
+ private void registerLogicalStartTaskId(String component, long taskId) {
+ logicalStartTaskIds.put(component, taskId);
+ }
+
+ //todo: remove
+ private long getLogicalInstallTaskId(String component) {
+ return logicalInstallTaskIds.get(component);
+ }
+
+ //todo: remove
+ private long getLogicalStartTaskId(String component) {
+ return logicalStartTaskIds.get(component);
+ }
+
+ //todo: since this is used to determine equality, using hashCode() isn't safe as it can return the same
+ //todo: value for 2 unequal requests
+ @Override
+ public int compareTo(HostRequest other) {
+ if (containsMaster()) {
+ return other.containsMaster() ? hashCode() - other.hashCode() : -1;
+ } else if (other.containsMaster()) {
+ return 1;
+ } else return hashCode() - other.hashCode();
+ }
+
+ //todo: once we have logical tasks, move tracking of physical tasks there
+ public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) {
+ Collection<Long> physicalTasksForId = physicalTasks.get(logicalTaskId);
+ if (physicalTasksForId == null) {
+ physicalTasksForId = new HashSet<Long>();
+ physicalTasks.put(logicalTaskId, physicalTasksForId);
+ }
+ physicalTasksForId.add(physicalTaskId);
+ }
+
+ //todo: temporary step
+ public TopologyTask getInstallTask() {
+ return new InstallHostTask();
+ }
+
+ //todo: temporary step
+ public TopologyTask getStartTask() {
+ return new StartHostTask();
+ }
+
+ //todo: temporary refactoring step
+ public HostGroupInfo createHostGroupInfo(HostGroup group) {
+ HostGroupInfo info = new HostGroupInfo(group.getName());
+ info.setConfiguration(group.getConfiguration());
+
+ return info;
+ }
+
+ private synchronized HostResourceProvider getHostResourceProvider() {
+ if (hostResourceProvider == null) {
+ hostResourceProvider = (HostResourceProvider)
+ ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.Host);
+
+ }
+ return hostResourceProvider;
+ }
+
+ private synchronized HostComponentResourceProvider getHostComponentResourceProvider() {
+ if (hostComponentResourceProvider == null) {
+ hostComponentResourceProvider = (HostComponentResourceProvider)
+ ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
+ }
+ return hostComponentResourceProvider;
+ }
+
+ //todo: extract
+ private class InstallHostTask implements TopologyTask {
+ //todo: use future to obtain returned Response which contains the request id
+ //todo: error handling
+ //todo: monitor status of requests
+
+ @Override
+ public Type getType() {
+ return Type.INSTALL;
+ }
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname);
+ RequestStatusResponse response = getHostResourceProvider().install(getHostName(), cluster);
+ // map logical install tasks to physical install tasks
+ List<ShortTaskStatus> underlyingTasks = response.getTasks();
+ for (ShortTaskStatus task : underlyingTasks) {
+ Long logicalInstallTaskId = getLogicalInstallTaskId(task.getRole());
+ //todo: for now only one physical task per component
+ long taskId = task.getTaskId();
+ //physicalTasks.put(logicalInstallTaskId, Collections.singleton(taskId));
+ registerPhysicalTaskId(logicalInstallTaskId, taskId);
+
+ //todo: move this to provision
+ //todo: shouldn't have to iterate over all tasks to find install task
+ //todo: we are doing the same thing in the above registerPhysicalTaskId() call
+ // set attempt count on task
+ for (HostRoleCommand logicalTask : logicalTasks) {
+ if (logicalTask.getTaskId() == logicalInstallTaskId) {
+ logicalTask.incrementAttemptCount();
+ }
+ }
+ }
+ } catch (ResourceAlreadyExistsException e) {
+ e.printStackTrace();
+ } catch (SystemException e) {
+ e.printStackTrace();
+ } catch (NoSuchParentResourceException e) {
+ e.printStackTrace();
+ } catch (UnsupportedPropertyException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ //todo: extract
+ private class StartHostTask implements TopologyTask {
+ //todo: use future to obtain returned Response which contains the request id
+ //todo: error handling
+ //todo: monitor status of requests
+
+ @Override
+ public Type getType() {
+ return Type.START;
+ }
+
+ @Override
+ public void run() {
+ try {
+ System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname);
+ RequestStatusResponse response = getHostComponentResourceProvider().start(cluster, hostname);
+ // map logical install tasks to physical install tasks
+ List<ShortTaskStatus> underlyingTasks = response.getTasks();
+ for (ShortTaskStatus task : underlyingTasks) {
+ String component = task.getRole();
+ Long logicalStartTaskId = getLogicalStartTaskId(component);
+ // for now just set on outer map
+ registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
+
+ //todo: move this to provision
+ // set attempt count on task
+ for (HostRoleCommand logicalTask : logicalTasks) {
+ if (logicalTask.getTaskId() == logicalStartTaskId) {
+ logicalTask.incrementAttemptCount();
+ }
+ }
+ }
+ } catch (SystemException e) {
+ e.printStackTrace();
+ } catch (UnsupportedPropertyException e) {
+ e.printStackTrace();
+ } catch (NoSuchParentResourceException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class CreateHostResourcesTask implements TopologyTask {
+ private ClusterTopology topology;
+ private HostImpl host;
+ private String groupName;
+
+ public CreateHostResourcesTask(ClusterTopology topology, HostImpl host, String groupName) {
+ this.topology = topology;
+ this.host = host;
+ this.groupName = groupName;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.RESOURCE_CREATION;
+ }
+
+ @Override
+ public void run() {
+ try {
+ createHostResources();
+ } catch (AmbariException e) {
+ //todo: report error to caller
+ e.printStackTrace();
+ System.out.println("An error occurred when creating host resources: " + e.toString());
+ }
+ }
+
+ private void createHostResources() throws AmbariException {
+ Map<String, Object> properties = new HashMap<String, Object>();
+ properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, getClusterName());
+ properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, host.getHostName());
+ properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo());
+
+ getHostResourceProvider().createHosts(new RequestImpl(null, Collections.singleton(properties), null, null));
+ createHostComponentResources();
+ }
+
+ private void createHostComponentResources() throws AmbariException {
+ Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
+ Stack stack = topology.getBlueprint().getStack();
+ for (String component : topology.getBlueprint().getHostGroup(groupName).getComponents()) {
+ //todo: handle this in a generic manner. These checks are all over the code
+ if (! component.equals("AMBARI_SERVER")) {
+ requests.add(new ServiceComponentHostRequest(topology.getClusterName(),
+ stack.getServiceForComponent(component), component, host.getHostName(), null));
+ }
+ }
+
+ controller.createHostComponents(requests);
+ }
+ }
+
+ //todo: extract
+ private class ConfigureConfigGroup implements TopologyTask {
+ private String groupName;
+ private String clusterName;
+ private String hostName;
+
+ public ConfigureConfigGroup(String groupName, String clusterName, String hostName) {
+ this.groupName = groupName;
+ this.clusterName = clusterName;
+ this.hostName = hostName;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CONFIGURE;
+ }
+
+ @Override
+ public void run() {
+ try {
+ //todo: add task to offer response
+ if (! addHostToExistingConfigGroups()) {
+ createConfigGroupsAndRegisterHost();
+ }
+ } catch (Exception e) {
+ //todo: handle exceptions
+ e.printStackTrace();
+ throw new RuntimeException("Unable to register config group for host: " + hostname);
+ }
+ }
+
+ /**
+ * Add the new host to an existing config group.
+ *
+ * @throws SystemException an unknown exception occurred
+ * @throws UnsupportedPropertyException an unsupported property was specified in the request
+ * @throws NoSuchParentResourceException a parent resource doesn't exist
+ */
+ private boolean addHostToExistingConfigGroups()
+ throws SystemException,
+ UnsupportedPropertyException,
+ NoSuchParentResourceException {
+
+ boolean addedHost = false;
+
+ Clusters clusters;
+ Cluster cluster;
+ try {
+ clusters = controller.getClusters();
+ cluster = clusters.getCluster(clusterName);
+ } catch (AmbariException e) {
+ throw new IllegalArgumentException(
+ String.format("Attempt to add hosts to a non-existent cluster: '%s'", clusterName));
+ }
+ // I don't know of a method to get config group by name
+ //todo: add a method to get config group by name
+ Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups();
+ for (ConfigGroup group : configGroups.values()) {
+ if (group.getName().equals(groupName)) {
+ try {
+ group.addHost(clusters.getHost(hostName));
+ group.persist();
+ addedHost = true;
+ } catch (AmbariException e) {
+ // shouldn't occur, this host was just added to the cluster
+ throw new SystemException(String.format(
+ "Unable to obtain newly created host '%s' from cluster '%s'", hostName, clusterName));
+ }
+ }
+ }
+ return addedHost;
+ }
+
+ /**
+ * Register config groups for host group scoped configuration.
+ * For each host group with configuration specified in the blueprint, a config group is created
+ * and the hosts associated with the host group are assigned to the config group.
+ *
+ * @throws ResourceAlreadyExistsException attempt to create a config group that already exists
+ * @throws SystemException an unexpected exception occurs
+ * @throws UnsupportedPropertyException an invalid property is provided when creating a config group
+ * @throws NoSuchParentResourceException attempt to create a config group for a non-existing cluster
+ */
+ private void createConfigGroupsAndRegisterHost() throws
+ ResourceAlreadyExistsException, SystemException,
+ UnsupportedPropertyException, NoSuchParentResourceException {
+
+ //HostGroupEntity entity = hostGroup.getEntity();
+ HostGroup hostGroup = getHostGroup();
+ Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>();
+
+ Stack stack = hostGroup.getStack();
+
+ // get the host-group config with cluster creation template overrides
+ Configuration topologyHostGroupConfig = topologyContext.getClusterTopology().
+ getHostGroupInfo().get(hostGroup.getName()).getConfiguration();
+
+ //handling backwards compatibility for group configs
+ //todo: doesn't belong here
+ handleGlobalsBackwardsCompability(stack, topologyHostGroupConfig.getProperties());
+
+ // iterate over topo host group configs which were defined in CCT/HG and BP/HG only, no parent configs
+ for (Map.Entry<String, Map<String, String>> entry: topologyHostGroupConfig.getProperties().entrySet()) {
+ String type = entry.getKey();
+ String service = stack.getServiceForConfigType(type);
+ Config config = new ConfigImpl(type);
+ config.setTag(hostGroup.getName());
+ config.setProperties(entry.getValue());
+ //todo: attributes
+ Map<String, Config> serviceConfigs = groupConfigs.get(service);
+ if (serviceConfigs == null) {
+ serviceConfigs = new HashMap<String, Config>();
+ groupConfigs.put(service, serviceConfigs);
+ }
+ serviceConfigs.put(type, config);
+ }
+
+ String bpName = topologyContext.getClusterTopology().getBlueprint().getName();
+ for (Map.Entry<String, Map<String, Config>> entry : groupConfigs.entrySet()) {
+ String service = entry.getKey();
+ Map<String, Config> serviceConfigs = entry.getValue();
+ String absoluteGroupName = getConfigurationGroupName(bpName, hostGroup.getName());
+ Collection<String> groupHosts;
+
+ groupHosts = topologyContext.getClusterTopology().getHostGroupInfo().
+ get(hostgroupName).getHostNames();
+
+ ConfigGroupRequest request = new ConfigGroupRequest(
+ null, getClusterName(), absoluteGroupName, service, "Host Group Configuration",
+ new HashSet<String>(groupHosts), serviceConfigs);
+
+ // get the config group provider and create config group resource
+ ConfigGroupResourceProvider configGroupProvider = (ConfigGroupResourceProvider)
+ ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.ConfigGroup);
+ configGroupProvider.createResources(Collections.singleton(request));
+ }
+ }
+
+
+ }
+
+ private class HostResourceAdapter implements Resource {
+ Resource hostResource;
+
+ public HostResourceAdapter(HostImpl host) {
+ buildPropertyMap(host);
+ }
+
+ @Override
+ public Object getPropertyValue(String id) {
+ return hostResource.getPropertyValue(id);
+ }
+
+ @Override
+ public Map<String, Map<String, Object>> getPropertiesMap() {
+ return hostResource.getPropertiesMap();
+ }
+
+ @Override
+ public Type getType() {
+ return Type.Host;
+ }
+
+ @Override
+ public void addCategory(String id) {
+ // read only, nothing to do
+ }
+
+ @Override
+ public void setProperty(String id, Object value) {
+ // read only, nothing to do
+ }
+
+ private void buildPropertyMap(HostImpl host) {
+ hostResource = new ResourceImpl(Resource.Type.Host);
+
+ hostResource.setProperty(HostResourceProvider.HOST_NAME_PROPERTY_ID,
+ host.getHostName());
+ hostResource.setProperty(HostResourceProvider.HOST_PUBLIC_NAME_PROPERTY_ID,
+ host.getPublicHostName());
+ hostResource.setProperty(HostResourceProvider.HOST_IP_PROPERTY_ID,
+ host.getIPv4());
+ hostResource.setProperty(HostResourceProvider.HOST_TOTAL_MEM_PROPERTY_ID,
+ host.getTotalMemBytes());
+ hostResource.setProperty(HostResourceProvider.HOST_CPU_COUNT_PROPERTY_ID,
+ (long) host.getCpuCount());
+ hostResource.setProperty(HostResourceProvider.HOST_PHYSICAL_CPU_COUNT_PROPERTY_ID,
+ (long) host.getPhCpuCount());
+ hostResource.setProperty(HostResourceProvider.HOST_OS_ARCH_PROPERTY_ID,
+ host.getOsArch());
+ hostResource.setProperty(HostResourceProvider.HOST_OS_TYPE_PROPERTY_ID,
+ host.getOsType());
+ hostResource.setProperty(HostResourceProvider.HOST_OS_FAMILY_PROPERTY_ID,
+ host.getOsFamily());
+ hostResource.setProperty(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID,
+ host.getRackInfo());
+ hostResource.setProperty(HostResourceProvider.HOST_LAST_HEARTBEAT_TIME_PROPERTY_ID,
+ host.getLastHeartbeatTime());
+ hostResource.setProperty(HostResourceProvider.HOST_LAST_AGENT_ENV_PROPERTY_ID,
+ host.getLastAgentEnv());
+ hostResource.setProperty(HostResourceProvider.HOST_LAST_REGISTRATION_TIME_PROPERTY_ID,
+ host.getLastRegistrationTime());
+ hostResource.setProperty(HostResourceProvider.HOST_HOST_STATUS_PROPERTY_ID,
+ host.getStatus());
+ hostResource.setProperty(HostResourceProvider.HOST_HOST_HEALTH_REPORT_PROPERTY_ID,
+ host.getHealthStatus().getHealthReport());
+ hostResource.setProperty(HostResourceProvider.HOST_DISK_INFO_PROPERTY_ID,
+ host.getDisksInfo());
+ hostResource.setProperty(HostResourceProvider.HOST_STATE_PROPERTY_ID,
+ host.getState());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
new file mode 100644
index 0000000..042e9fc
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * Indicates an invalid topology.
+ */
+public class InvalidTopologyException extends Exception {
+ public InvalidTopologyException(String s) {
+ super(s);
+ }
+
+ public InvalidTopologyException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
new file mode 100644
index 0000000..85422a0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * The information provided is invalid for the template.
+
+ * To change this template use File | Settings | File Templates.
+ */
+public class InvalidTopologyTemplateException extends Exception {
+ public InvalidTopologyTemplateException(String s) {
+ super(s);
+ }
+
+ public InvalidTopologyTemplateException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
new file mode 100644
index 0000000..5273ff8
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.apache.ambari.server.controller.AmbariServer.getController;
+
+/**
+ * Logical Request implementation.
+ */
+public class LogicalRequest extends Request {
+
+ private Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>();
+ // sorted set with master host requests given priority
+ private Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>();
+ private Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>();
+
+ private final ClusterTopology topology;
+
+
+ //todo: topologyContext is a temporary refactoring step
+ public LogicalRequest(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) throws AmbariException {
+ //todo: abstract usage of controller, etc ...
+ super(getController().getActionManager().getNextRequestId(), getController().getClusters().getCluster(
+ requestRequest.getClusterName()).getClusterId(), getController().getClusters());
+
+ this.topology = topologyContext.getClusterTopology();
+ createHostRequests(requestRequest, topologyContext);
+ }
+
+ public HostOfferResponse offer(HostImpl host) {
+ // attempt to match to a host request with an explicit host reservation first
+ synchronized (requestsWithReservedHosts) {
+ HostRequest hostRequest = requestsWithReservedHosts.remove(host.getHostName());
+ if (hostRequest != null) {
+ HostOfferResponse response = hostRequest.offer(host);
+ if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
+ //todo: error handling. This is really a system exception and shouldn't happen
+ throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " +
+ host.getHostName());
+ }
+ return response;
+ }
+ }
+
+ // not explicitly reserved, at least not in this request, so attempt to match to outstanding host requests
+ boolean predicateRejected = false;
+ synchronized (outstandingHostRequests) {
+ //todo: prioritization of master host requests
+ Iterator<HostRequest> hostRequestIterator = outstandingHostRequests.iterator();
+ while (hostRequestIterator.hasNext()) {
+ HostOfferResponse response = hostRequestIterator.next().offer(host);
+ switch (response.getAnswer()) {
+ case ACCEPTED:
+ hostRequestIterator.remove();
+ return response;
+ case DECLINED_DONE:
+ //todo: should have been done on ACCEPT
+ hostRequestIterator.remove();
+ case DECLINED_PREDICATE:
+ predicateRejected = true;
+ }
+ }
+ }
+ // if at least one outstanding host request rejected for predicate or we have an outstanding request
+ // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved
+ //todo: could also check if outstandingHostRequests is empty
+ return predicateRejected || ! requestsWithReservedHosts.isEmpty() ?
+ new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) :
+ new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+ }
+
+ //todo
+ @Override
+ public Collection<Stage> getStages() {
+ return super.getStages();
+ }
+
+ @Override
+ public List<HostRoleCommand> getCommands() {
+ List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
+ for (HostRequest hostRequest : allHostRequests) {
+ commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getTasks()));
+ }
+ return commands;
+ }
+
+ public Collection<String> getReservedHosts() {
+ return requestsWithReservedHosts.keySet();
+ }
+
+ //todo: account for blueprint name?
+ //todo: this should probably be done implicitly at a lower level
+ public boolean areGroupsResolved(Collection<String> hostGroupNames) {
+ synchronized (outstandingHostRequests) {
+ // iterate over outstanding host requests
+ for (HostRequest request : outstandingHostRequests) {
+ if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public Map<String, Collection<String>> getProjectedTopology() {
+ Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
+
+ //todo: synchronization
+ for (HostRequest hostRequest : allHostRequests) {
+ HostGroup hostGroup = hostRequest.getHostGroup();
+ for (String host : topology.getHostGroupInfo().get(hostGroup.getName()).getHostNames()) {
+ Collection<String> hostComponents = hostComponentMap.get(host);
+ if (hostComponents == null) {
+ hostComponents = new HashSet<String>();
+ hostComponentMap.put(host, hostComponents);
+ }
+ hostComponents.addAll(hostGroup.getComponents());
+ }
+ }
+ return hostComponentMap;
+ }
+
+ //todo: currently we are just returning all stages for all requests
+ //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each
+ //todo: needed to change the name to avoid a name collision.
+ public Collection<StageEntity> getStageEntities() {
+ Collection<StageEntity> stages = new ArrayList<StageEntity>();
+ for (HostRequest hostRequest : allHostRequests) {
+ StageEntity stage = new StageEntity();
+ stage.setStageId(hostRequest.getStageId());
+ stage.setRequestContext(getRequestContext());
+ stage.setRequestId(getRequestId());
+ //todo: not sure what this byte array is???
+ //stage.setClusterHostInfo();
+ stage.setClusterId(getClusterId());
+ stage.setSkippable(false);
+ // getTaskEntities() sync's state with physical tasks
+ stage.setHostRoleCommands(hostRequest.getTaskEntities());
+
+ stages.add(stage);
+ }
+ return stages;
+ }
+
+ public RequestStatusResponse getRequestStatus() {
+ RequestStatusResponse requestStatus = new RequestStatusResponse(getRequestId());
+ requestStatus.setRequestContext(getRequestContext());
+ //todo: other request status fields
+ //todo: ordering of tasks?
+
+ // convert HostRoleCommands to ShortTaskStatus
+ List<ShortTaskStatus> shortTasks = new ArrayList<ShortTaskStatus>();
+ for (HostRoleCommand task : getCommands()) {
+ shortTasks.add(new ShortTaskStatus(task));
+ }
+ requestStatus.setTasks(shortTasks);
+ //todo: null tasks?
+
+ return requestStatus;
+ }
+
+ public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries() {
+ Map<Long, HostRoleCommandStatusSummaryDTO> summaryMap = new HashMap<Long, HostRoleCommandStatusSummaryDTO>();
+
+ for (StageEntity stage : getStageEntities()) {
+ //Number minStartTime = 0;
+ //Number maxEndTime = 0;
+ int aborted = 0;
+ int completed = 0;
+ int failed = 0;
+ int holding = 0;
+ int holdingFailed = 0;
+ int holdingTimedout = 0;
+ int inProgress = 0;
+ int pending = 0;
+ int queued = 0;
+ int timedout = 0;
+
+ //todo: where does this logic belong?
+ for (HostRoleCommandEntity task : stage.getHostRoleCommands()) {
+ HostRoleStatus taskStatus = task.getStatus();
+
+ switch (taskStatus) {
+ case ABORTED:
+ aborted += 1;
+ break;
+ case COMPLETED:
+ completed += 1;
+ break;
+ case FAILED:
+ failed += 1;
+ break;
+ case HOLDING:
+ holding += 1;
+ break;
+ case HOLDING_FAILED:
+ holdingFailed += 1;
+ break;
+ case HOLDING_TIMEDOUT:
+ holdingTimedout += 1;
+ break;
+ case IN_PROGRESS:
+ inProgress += 1;
+ break;
+ case PENDING:
+ pending += 1;
+ break;
+ case QUEUED:
+ queued += 1;
+ break;
+ case TIMEDOUT:
+ timedout += 1;
+ break;
+ default:
+ //todo: proper log msg
+ System.out.println("Unexpected status when creating stage summaries: " + taskStatus);
+ }
+ }
+
+ //todo: skippable. I only see a skippable field on the stage, not the tasks
+ //todo: time related fields
+ HostRoleCommandStatusSummaryDTO stageSummary = new HostRoleCommandStatusSummaryDTO(stage.isSkippable() ? 1 : 0, 0, 0,
+ stage.getStageId(), aborted, completed, failed, holding, holdingFailed, holdingTimedout, inProgress, pending, queued, timedout);
+ summaryMap.put(stage.getStageId(), stageSummary);
+ }
+ return summaryMap;
+ }
+
+ //todo: context is a temporary refactoring step
+ private void createHostRequests(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) {
+ //todo: consistent stage ordering
+ //todo: confirm that stages don't need to be unique across requests
+ long stageIdCounter = 0;
+ Map<String, HostGroupInfo> hostGroupInfoMap = requestRequest.getHostGroupInfo();
+ for (HostGroupInfo hostGroupInfo : hostGroupInfoMap.values()) {
+ String groupName = hostGroupInfo.getHostGroupName();
+ Blueprint blueprint = topology.getBlueprint();
+ int hostCardinality;
+ List<String> hostnames;
+
+ hostCardinality = hostGroupInfo.getRequestedHostCount();
+ hostnames = new ArrayList<String>(hostGroupInfo.getHostNames());
+
+
+ for (int i = 0; i < hostCardinality; ++i) {
+ if (! hostnames.isEmpty()) {
+ // host names are specified
+ String hostname = hostnames.get(i);
+ //todo: pass in HostGroupInfo
+ HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
+ blueprint.getName(), blueprint.getHostGroup(groupName), hostname, hostGroupInfo.getPredicate(),
+ topologyContext);
+ synchronized (requestsWithReservedHosts) {
+ requestsWithReservedHosts.put(hostname, hostRequest);
+ }
+ } else {
+ // host count is specified
+ //todo: pass in HostGroupInfo
+ HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
+ blueprint.getName(), blueprint.getHostGroup(groupName), hostCardinality, hostGroupInfo.getPredicate(),
+ topologyContext);
+ outstandingHostRequests.add(hostRequest);
+ }
+ }
+ }
+
+ allHostRequests.addAll(outstandingHostRequests);
+ allHostRequests.addAll(requestsWithReservedHosts.values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
new file mode 100644
index 0000000..5ce2532
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * The requested blueprint doesn't exist
+ */
+public class NoSuchBlueprintException extends Exception {
+ public NoSuchBlueprintException(String name) {
+ super(String.format("No blueprint exists with the name '%s'", name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
new file mode 100644
index 0000000..413cb4e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: john
+ * Date: 4/3/15
+ * Time: 3:59 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class NoSuchHostGroupException extends Exception {
+ public NoSuchHostGroupException(String hostgroupName) {
+ super("Requested HostGroup doesn't exist: " + hostgroupName);
+ }
+
+ public NoSuchHostGroupException(String hostgroupName, String msg) {
+ super(msg + ". Cause: Requested HostGroup doesn't exist: " + hostgroupName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
new file mode 100644
index 0000000..870d718
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.controller.internal.Stack;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * Validates that all required passwords are provided.
+ */
+public class RequiredPasswordValidator implements TopologyValidator {
+
+ private String defaultPassword;
+
+ public RequiredPasswordValidator(String defaultPassword) {
+ this.defaultPassword = defaultPassword;
+ }
+
+ /**
+ * Validate that all required password properties have been set or that 'default_password' is specified.
+ *
+ * @throws InvalidTopologyException if required password properties are missing and no
+ * default is specified via 'default_password'
+ */
+ public void validate(ClusterTopology topology) throws InvalidTopologyException {
+ Map<String, Map<String, Collection<String>>> missingPasswords = validateRequiredPasswords(topology);
+
+ if (! missingPasswords.isEmpty()) {
+ throw new InvalidTopologyException("Missing required password properties. Specify a value for these " +
+ "properties in the cluster or host group configurations or include 'default_password' field in request. " +
+ missingPasswords);
+ }
+ }
+
+ /**
+ * Validate all configurations. Validation is done on the operational configuration of each
+ * host group. An operational configuration is achieved by overlaying host group configuration
+ * on top of cluster configuration which overlays the default stack configurations.
+ *
+ * @return map of required properties which are missing. Empty map if none are missing.
+ *
+ * @throws IllegalArgumentException if blueprint contains invalid information
+ */
+
+ //todo: this is copied/pasted from Blueprint and is currently only used by validatePasswordProperties()
+ //todo: seems that we should have some common place for this code so it can be used by BP and here?
+ private Map<String, Map<String, Collection<String>>> validateRequiredPasswords(ClusterTopology topology) {
+
+ Map<String, Map<String, Collection<String>>> missingProperties =
+ new HashMap<String, Map<String, Collection<String>>>();
+
+ for (Map.Entry<String, HostGroupInfo> groupEntry: topology.getHostGroupInfo().entrySet()) {
+ String hostGroupName = groupEntry.getKey();
+ Map<String, Map<String, String>> groupProperties =
+ groupEntry.getValue().getConfiguration().getFullProperties(3);
+
+ Collection<String> processedServices = new HashSet<String>();
+ Blueprint blueprint = topology.getBlueprint();
+ Stack stack = blueprint.getStack();
+
+ HostGroup hostGroup = blueprint.getHostGroup(hostGroupName);
+ for (String component : hostGroup.getComponents()) {
+ //for now, AMBARI is not recognized as a service in Stacks
+ if (component.equals("AMBARI_SERVER")) {
+ continue;
+ }
+
+ String serviceName = stack.getServiceForComponent(component);
+ if (processedServices.add(serviceName)) {
+ //todo: do I need to subtract excluded configs?
+ Collection<Stack.ConfigProperty> requiredProperties =
+ stack.getRequiredConfigurationProperties(serviceName, "PASSWORD");
+
+ for (Stack.ConfigProperty property : requiredProperties) {
+ String category = property.getType();
+ String name = property.getName();
+ if (! propertyExists(topology, groupProperties, category, name)) {
+ Map<String, Collection<String>> missingHostGroupPropsMap = missingProperties.get(hostGroupName);
+ if (missingHostGroupPropsMap == null) {
+ missingHostGroupPropsMap = new HashMap<String, Collection<String>>();
+ missingProperties.put(hostGroupName, missingHostGroupPropsMap);
+ }
+ Collection<String> missingHostGroupTypeProps = missingHostGroupPropsMap.get(category);
+ if (missingHostGroupTypeProps == null) {
+ missingHostGroupTypeProps = new HashSet<String>();
+ missingHostGroupPropsMap.put(category, missingHostGroupTypeProps);
+ }
+ missingHostGroupTypeProps.add(name);
+ }
+ }
+ }
+ }
+ }
+ return missingProperties;
+ }
+
+ private boolean propertyExists(ClusterTopology topology, Map<String, Map<String, String>> props, String type, String property) {
+ Map<String, String> typeProps = props.get(type);
+ return (typeProps != null && typeProps.containsKey(property)) || setDefaultPassword(topology, type, property);
+ }
+
+ /**
+ * Attempt to set the default password in cluster configuration for missing password property.
+ *
+ * @param configType configuration type
+ * @param property password property name
+ *
+ * @return true if password was set, otherwise false. Currently the password will always be set
+ * unless it is null
+ */
+ private boolean setDefaultPassword(ClusterTopology topology, String configType, String property) {
+ boolean setDefaultPassword = false;
+ if (defaultPassword != null && ! defaultPassword.trim().isEmpty()) {
+ topology.getConfiguration().setProperty(configType, property, defaultPassword);
+ setDefaultPassword = true;
+ }
+ return setDefaultPassword;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RequiredPasswordValidator that = (RequiredPasswordValidator) o;
+
+ return defaultPassword == null ? that.defaultPassword == null : defaultPassword.equals(that.defaultPassword);
+ }
+
+ @Override
+ public int hashCode() {
+ return defaultPassword != null ? defaultPassword.hashCode() : 0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
new file mode 100644
index 0000000..3e1b565
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import com.google.inject.Singleton;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.ClusterRequest;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ServiceComponentRequest;
+import org.apache.ambari.server.controller.ServiceRequest;
+import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
+import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages all cluster provisioning actions on the cluster topology.
+ */
+//todo: cluster isolation
+@Singleton
+public class TopologyManager {
+
+ private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
+ private final Map<String, LogicalRequest> reservedHosts = new HashMap<String, LogicalRequest>();
+ private final Map<Long, LogicalRequest> allRequests = new HashMap<Long, LogicalRequest>();
+ // priority is given to oldest outstanding requests
+ private final Collection<LogicalRequest> outstandingRequests = new ArrayList<LogicalRequest>();
+ private Map<String, ClusterTopology> clusterTopologyMap = new HashMap<String, ClusterTopology>();
+ private final Map<TopologyTask.Type, Set<TopologyTask>> pendingTasks = new HashMap<TopologyTask.Type, Set<TopologyTask>>();
+
+ //todo: proper wait/notify mechanism
+ private final Object configurationFlagLock = new Object();
+ private boolean configureComplete = false;
+
+ private AmbariManagementController controller;
+ ExecutorService executor;
+ //todo: task id's. Use existing mechanism for getting next task id sequence
+ private final static AtomicLong nextTaskId = new AtomicLong(10000);
+ private final Object serviceResourceLock = new Object();
+
+
+ public TopologyManager() {
+ pendingTasks.put(TopologyTask.Type.CONFIGURE, new HashSet<TopologyTask>());
+ pendingTasks.put(TopologyTask.Type.INSTALL, new HashSet<TopologyTask>());
+ pendingTasks.put(TopologyTask.Type.START, new HashSet<TopologyTask>());
+
+ executor = getExecutorService();
+ }
+
+ public RequestStatusResponse provisionCluster(TopologyRequest request) throws InvalidTopologyException, AmbariException {
+ ClusterTopology topology = new ClusterTopologyImpl(request);
+
+ String clusterName = topology.getClusterName();
+ clusterTopologyMap.put(clusterName, topology);
+
+ createClusterResource(clusterName);
+ createServiceAndComponentResources(topology);
+
+ LogicalRequest logicalRequest = processRequest(request, topology);
+ try {
+ addClusterConfigRequest(new ClusterConfigurationRequest(topology));
+ } catch (AmbariException e) {
+ //todo
+ throw e;
+ }
+
+ //todo: this should be invoked as part of a generic lifecycle event which could possibly
+ //todo: be tied to cluster state
+ persistInstallStateForUI(clusterName);
+ return getRequestStatus(logicalRequest.getRequestId());
+ }
+
+ public RequestStatusResponse scaleHosts(TopologyRequest request)
+ throws InvalidTopologyException, AmbariException {
+
+ String clusterName = request.getClusterName();
+ ClusterTopology topology = clusterTopologyMap.get(clusterName);
+ if (topology == null) {
+ throw new AmbariException("TopologyManager: Unable to retrieve cluster topology for cluster: " + clusterName);
+ }
+
+ // this registers/updates all request host groups
+ topology.update(request);
+ return getRequestStatus(processRequest(request, topology).getRequestId());
+ }
+
+ //todo: should be synchronized on same lock as onHostRegistered()
+ //todo: HostImpl is what is registered with the HearbeatHandler and contains more host info than HostInfo so
+ //todo: we should probably change to use HostImpl
+ public void onHostRegistered(HostImpl host, boolean associatedWithCluster) {
+ if (associatedWithCluster) {
+ return;
+ }
+
+ boolean matchedToRequest = false;
+ String hostName = host.getHostName();
+ synchronized(reservedHosts) {
+ if (reservedHosts.containsKey(hostName)) {
+ LogicalRequest request = reservedHosts.remove(hostName);
+ HostOfferResponse response = request.offer(host);
+ if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
+ //todo: this is handled explicitly in LogicalRequest so this shouldn't happen here
+ throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName);
+ }
+ processAcceptedHostOffer(getClusterTopology(request.getClusterName()), response, host);
+ matchedToRequest = true;
+ }
+ }
+
+ // can be true if host was reserved
+ if (! matchedToRequest) {
+ synchronized (outstandingRequests) {
+ Iterator<LogicalRequest> outstandingRequestIterator = outstandingRequests.iterator();
+ while (! matchedToRequest && outstandingRequestIterator.hasNext()) {
+ LogicalRequest request = outstandingRequestIterator.next();
+ HostOfferResponse hostOfferResponse = request.offer(host);
+ switch (hostOfferResponse.getAnswer()) {
+ case ACCEPTED:
+ matchedToRequest = true;
+ processAcceptedHostOffer(getClusterTopology(request.getClusterName()), hostOfferResponse, host);
+ break;
+ case DECLINED_DONE:
+ outstandingRequestIterator.remove();
+ break;
+ case DECLINED_PREDICATE:
+ break;
+ }
+ }
+ }
+ }
+
+ if (! matchedToRequest) {
+ synchronized (availableHosts) {
+ System.out.printf("TopologyManager: Queueing available host %s\n", hostName);
+ availableHosts.add(host);
+ }
+ }
+ }
+
+ public void onHostLeft(String hostname) {
+ //todo:
+ }
+
+ public Request getRequest(long requestId) {
+ return allRequests.get(requestId);
+ }
+
+ public Collection<LogicalRequest> getRequests(Collection<Long> requestIds) {
+ if (requestIds.isEmpty()) {
+ return allRequests.values();
+ } else {
+ Collection<LogicalRequest> matchingRequests = new ArrayList<LogicalRequest>();
+ for (long id : requestIds) {
+ LogicalRequest request = allRequests.get(id);
+ if (request != null) {
+ matchingRequests.add(request);
+ }
+ }
+ return matchingRequests;
+ }
+ }
+
+ //todo: currently we are just returning all stages for all requests
+ //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each
+ public Collection<StageEntity> getStages() {
+ Collection<StageEntity> stages = new ArrayList<StageEntity>();
+ for (LogicalRequest logicalRequest : allRequests.values()) {
+ stages.addAll(logicalRequest.getStageEntities());
+ }
+ return stages;
+ }
+
+ public Collection<HostRoleCommand> getTasks(long requestId) {
+ LogicalRequest request = allRequests.get(requestId);
+ return request == null ? Collections.<HostRoleCommand>emptyList() : request.getCommands();
+ }
+
+ public Collection<HostRoleCommand> getTasks(Collection<Long> requestIds) {
+ Collection<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
+ for (long id : requestIds) {
+ tasks.addAll(getTasks(id));
+ }
+
+ return tasks;
+ }
+
+ public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries(Long requestId) {
+ LogicalRequest request = allRequests.get(requestId);
+ return request == null ? Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap() :
+ request.getStageSummaries();
+ }
+
+ public RequestStatusResponse getRequestStatus(long requestId) {
+ LogicalRequest request = allRequests.get(requestId);
+ return request == null ? null : request.getRequestStatus();
+ }
+
+ public Collection<RequestStatusResponse> getRequestStatus(Collection<Long> ids) {
+ List<RequestStatusResponse> requestStatusResponses = new ArrayList<RequestStatusResponse>();
+ for (long id : ids) {
+ RequestStatusResponse response = getRequestStatus(id);
+ if (response != null) {
+ requestStatusResponses.add(response);
+ }
+ }
+
+ return requestStatusResponses;
+ }
+
+ public ClusterTopology getClusterTopology(String clusterName) {
+ return clusterTopologyMap.get(clusterName);
+ }
+
+ public Map<String, Collection<String>> getProjectedTopology() {
+ Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
+
+ for (LogicalRequest logicalRequest : allRequests.values()) {
+ Map<String, Collection<String>> requestTopology = logicalRequest.getProjectedTopology();
+ for (Map.Entry<String, Collection<String>> entry : requestTopology.entrySet()) {
+ String host = entry.getKey();
+ Collection<String> hostComponents = hostComponentMap.get(host);
+ if (hostComponents == null) {
+ hostComponents = new HashSet<String>();
+ hostComponentMap.put(host, hostComponents);
+ }
+ hostComponents.addAll(entry.getValue());
+ }
+ }
+ return hostComponentMap;
+ }
+
+ private LogicalRequest processRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException {
+
+ finalizeTopology(request, topology);
+ LogicalRequest logicalRequest = createLogicalRequest(request, topology);
+
+ boolean requestHostComplete = false;
+ //todo: overall synchronization. Currently we have nested synchronization here
+ synchronized(availableHosts) {
+ Iterator<HostImpl> hostIterator = availableHosts.iterator();
+ while (! requestHostComplete && hostIterator.hasNext()) {
+ HostImpl host = hostIterator.next();
+ synchronized (reservedHosts) {
+ String hostname = host.getHostName();
+ if (reservedHosts.containsKey(hostname)) {
+ if (logicalRequest.equals(reservedHosts.get(hostname))) {
+ // host is registered to this request, remove it from reserved map
+ reservedHosts.remove(hostname);
+ } else {
+ // host is registered with another request, don't offer
+ //todo: clean up logic
+ continue;
+ }
+ }
+ }
+ HostOfferResponse response = logicalRequest.offer(host);
+ switch (response.getAnswer()) {
+ case ACCEPTED:
+ //todo: when host matches last host it returns ACCEPTED so we don't know that logical request is no
+ //todo: longer outstanding until we call offer again. This is really only an issue if we need to
+ //todo: deal specifically with outstanding hosts other than calling offer. Also, failure handling
+ //todo: may affect this behavior??
+ hostIterator.remove();
+ processAcceptedHostOffer(getClusterTopology(logicalRequest.getClusterName()), response, host);
+ break;
+ case DECLINED_DONE:
+ requestHostComplete = true;
+ break;
+ case DECLINED_PREDICATE:
+ break;
+ }
+ }
+
+ if (! requestHostComplete) {
+ // not all required hosts have been matched (see earlier comment regarding outstanding logical requests
+ outstandingRequests.add(logicalRequest);
+ }
+ }
+ return logicalRequest;
+ }
+
+ private LogicalRequest createLogicalRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException {
+ LogicalRequest logicalRequest = new LogicalRequest(request, new ClusterTopologyContext(topology));
+ allRequests.put(logicalRequest.getRequestId(), logicalRequest);
+ synchronized (reservedHosts) {
+ for (String host : logicalRequest.getReservedHosts()) {
+ reservedHosts.put(host, logicalRequest);
+ }
+ }
+
+ return logicalRequest;
+ }
+
+ private void processAcceptedHostOffer(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
+ try {
+ topology.addHostToTopology(response.getHostGroupName(), host.getHostName());
+ } catch (InvalidTopologyException e) {
+ //todo
+ throw new RuntimeException(e);
+ } catch (NoSuchHostGroupException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<TopologyTask> tasks = response.getTasks();
+ synchronized (configurationFlagLock) {
+ if (configureComplete) {
+ for (TopologyTask task : tasks) {
+ task.run();
+ }
+ }else {
+ for (TopologyTask task : tasks) {
+ //todo: proper state dependencies
+ TopologyTask.Type taskType = task.getType();
+ if (taskType == TopologyTask.Type.RESOURCE_CREATION || taskType == TopologyTask.Type.CONFIGURE) {
+ task.run();
+ } else {
+ // all type collections are added at init time
+ pendingTasks.get(taskType).add(task);
+ }
+ }
+ }
+ }
+ }
+
+ //todo: this should invoke a callback on each 'service' in the topology
+ private void finalizeTopology(TopologyRequest request, ClusterTopology topology) {
+ addKerberosClientIfNecessary(topology);
+ }
+
+ /**
+ * Add the kerberos client to groups if kerberos is enabled for the cluster.
+ *
+ * @param topology cluster topology
+ */
+ //for now, hard coded here
+ private void addKerberosClientIfNecessary(ClusterTopology topology) {
+
+ String clusterName = topology.getClusterName();
+ //todo: logic would ideally be contained in the stack
+ Cluster cluster;
+ try {
+ cluster = getController().getClusters().getCluster(clusterName);
+ } catch (AmbariException e) {
+ //todo: this shouldn't happen at this point but still need to handle in a generic manner for topo finalization
+ throw new RuntimeException("Parent Cluster resource doesn't exist. clusterName= " + clusterName);
+ }
+ if (cluster.getSecurityType() == SecurityType.KERBEROS) {
+ for (HostGroup group : topology.getBlueprint().getHostGroups().values()) {
+ group.addComponent("KERBEROS_CLIENT");
+ }
+ }
+ }
+
+ // create a thread pool which is used for task execution
+ private synchronized ExecutorService getExecutorService() {
+ if (executor == null) {
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+ int THREAD_POOL_CORE_SIZE = 2;
+ int THREAD_POOL_MAX_SIZE = 100;
+ int THREAD_POOL_TIMEOUT = Integer.MAX_VALUE;
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+ THREAD_POOL_CORE_SIZE,
+ THREAD_POOL_MAX_SIZE,
+ THREAD_POOL_TIMEOUT,
+ TimeUnit.SECONDS,
+ queue);
+
+ //threadPoolExecutor.allowCoreThreadTimeOut(true);
+ executor = threadPoolExecutor;
+ }
+ return executor;
+ }
+
+ private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) {
+ //pendingTasks.get(Action.CONFIGURE).add(new ConfigureClusterTask(configurationRequest));
+ synchronized (configurationFlagLock) {
+ configureComplete = false;
+ }
+ executor.submit(new ConfigureClusterTask(configurationRequest));
+ }
+
+ private void createClusterResource(String clusterName) throws AmbariException {
+ Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack();
+ String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion());
+ ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, stackInfo, null);
+ getController().createCluster(clusterRequest);
+ }
+
+ private void createServiceAndComponentResources(ClusterTopology topology) {
+ String clusterName = topology.getClusterName();
+ Collection<String> services = topology.getBlueprint().getServices();
+
+ synchronized(serviceResourceLock) {
+ try {
+ Cluster cluster = getController().getClusters().getCluster(clusterName);
+ services.removeAll(cluster.getServices().keySet());
+ } catch (AmbariException e) {
+ //todo
+ throw new RuntimeException(e);
+ }
+ Set<ServiceRequest> serviceRequests = new HashSet<ServiceRequest>();
+ Set<ServiceComponentRequest> componentRequests = new HashSet<ServiceComponentRequest>();
+ for (String service : services) {
+ serviceRequests.add(new ServiceRequest(clusterName, service, null));
+ for (String component : topology.getBlueprint().getComponents(service)) {
+ componentRequests.add(new ServiceComponentRequest(clusterName, service, component, null));
+ }
+ }
+ try {
+ ServiceResourceProvider serviceResourceProvider = (ServiceResourceProvider) ClusterControllerHelper.
+ getClusterController().ensureResourceProvider(Resource.Type.Service);
+
+ serviceResourceProvider.createServices(serviceRequests);
+
+ ComponentResourceProvider componentResourceProvider = (ComponentResourceProvider) ClusterControllerHelper.
+ getClusterController().ensureResourceProvider(Resource.Type.Component);
+
+ componentResourceProvider.createComponents(componentRequests);
+ } catch (AmbariException e) {
+ //todo
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Persist cluster state for the ambari UI. Setting this state informs that UI that a cluster has been
+ * installed and started and that the monitoring screen for the cluster should be displayed to the user.
+ *
+ * @param clusterName name of cluster
+ */
+ //todo: invoke as part of a generic callback possible associated with cluster state
+ private void persistInstallStateForUI(String clusterName) throws AmbariException {
+ Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack();
+ String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion());
+ ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
+
+ getController().updateClusters(Collections.singleton(clusterRequest), null);
+ }
+
+ private synchronized AmbariManagementController getController() {
+ if (controller == null) {
+ controller = AmbariServer.getController();
+ }
+ return controller;
+ }
+
+ private class ConfigureClusterTask implements Runnable {
+ private ClusterConfigurationRequest configRequest;
+
+
+ public ConfigureClusterTask(ClusterConfigurationRequest configRequest) {
+ this.configRequest = configRequest;
+ }
+
+
+ @Override
+ public void run() {
+ System.out.println("TopologyManager.ConfigureClusterTask: Entering");
+
+ boolean completed = false;
+ boolean interrupted = false;
+
+ while (! completed && ! interrupted) {
+ completed = areConfigsResolved();
+
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ // reset interrupted flag on thread
+ Thread.interrupted();
+
+ }
+ }
+
+ if (! interrupted) {
+ try {
+ System.out.println("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster");
+ // sets updated configuration on topology and cluster
+ configRequest.process();
+ } catch (Exception e) {
+ //todo: how to handle this? If this fails, we shouldn't start any hosts.
+ System.out.println("TopologyManager.ConfigureClusterTask: " +
+ "An exception occurred while attempting to process cluster configs and set on cluster");
+ e.printStackTrace();
+ }
+
+ synchronized (configurationFlagLock) {
+ System.out.println("TopologyManager.ConfigureClusterTask: Setting configure complete flag to true");
+ configureComplete = true;
+ }
+
+ // execute all queued install/start tasks
+ executor.submit(new ExecuteQueuedHostTasks());
+ }
+ System.out.println("TopologyManager.ConfigureClusterTask: Exiting");
+ }
+
+ // get set of required host groups from config processor and confirm that all requests
+ // have fully resolved the host names for the required host groups
+ private boolean areConfigsResolved() {
+ boolean configTopologyResolved = true;
+ Collection<String> requiredHostGroups;
+ try {
+ requiredHostGroups = configRequest.getRequiredHostGroups();
+ } catch (RuntimeException e) {
+ //todo
+ System.out.println("Caught an error from Config Processor: " + e);
+ throw e;
+ }
+
+ synchronized (outstandingRequests) {
+ for (LogicalRequest outstandingRequest : outstandingRequests) {
+ if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) {
+ configTopologyResolved = false;
+ break;
+ }
+ }
+ }
+ return configTopologyResolved;
+ }
+ }
+
+ private class ExecuteQueuedHostTasks implements Runnable {
+ @Override
+ public void run() {
+ //todo: lock is too coarse grained, should only be on start tasks
+ synchronized(pendingTasks) {
+ // execute queued install tasks
+ //todo: once agent configuration is removed from agent install, we will be able to
+ //todo: install without regard to configuration resolution
+ Iterator<TopologyTask> iter = pendingTasks.get(TopologyTask.Type.INSTALL).iterator();
+ while (iter.hasNext()) {
+ iter.next().run();
+ iter.remove();
+ }
+
+ iter = pendingTasks.get(TopologyTask.Type.START).iterator();
+ while (iter.hasNext()) {
+ iter.next().run();
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ //todo: this is a temporary step, remove after refactoring makes it no longer needed
+ public class ClusterTopologyContext {
+ private ClusterTopology clusterTopology;
+
+ public ClusterTopologyContext(ClusterTopology clusterTopology) {
+ this.clusterTopology = clusterTopology;
+ }
+
+ public ClusterTopology getClusterTopology() {
+ return clusterTopology;
+ }
+
+ public long getNextTaskId() {
+ synchronized (nextTaskId) {
+ return nextTaskId.getAndIncrement();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
new file mode 100644
index 0000000..4c1abf9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A request which is used to create or modify a cluster topology.
+ */
+//todo: naming
+public interface TopologyRequest {
+
+ public String getClusterName();
+ //todo: only a single BP may be specified so all host groups have the same bp.
+ //todo: There is no reason really that we couldn't allow hostgroups from different blueprints assuming that
+ //todo: the stack matches across the groups. For scaling operations, we allow different blueprints (rather arbitrary)
+ //todo: so BP really needs to get associated with the HostGroupInfo, even for create which will have a single BP
+ //todo: for all HG's.
+ public Blueprint getBlueprint();
+ public Configuration getConfiguration();
+ public Map<String, HostGroupInfo> getHostGroupInfo();
+ public List<TopologyValidator> getTopologyValidators();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
new file mode 100644
index 0000000..284fbba
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.Map;
+
+/**
+ * Factory for creating topology requests.
+ */
+public interface TopologyRequestFactory {
+ public TopologyRequest createProvisionClusterRequest(Map<String, Object> properties) throws InvalidTopologyTemplateException;
+ // todo: use to create other request types
+}