You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by js...@apache.org on 2015/04/27 07:53:01 UTC

[07/13] ambari git commit: AMBARI-10750. Initial merge of advanced api provisioning work.

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
new file mode 100644
index 0000000..9f9db5c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -0,0 +1,814 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.ConfigGroupRequest;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ServiceComponentHostRequest;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider;
+import org.apache.ambari.server.controller.internal.HostComponentResourceProvider;
+import org.apache.ambari.server.controller.internal.HostResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestImpl;
+import org.apache.ambari.server.controller.internal.ResourceImpl;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ConfigImpl;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.ambari.server.controller.AmbariServer.getController;
+
+/**
+ * Represents a set of requests to a single host such as install, start, etc.
+ */
+public class HostRequest implements Comparable<HostRequest> {
+
+  private long requestId;
+  private String blueprint;
+  private HostGroup hostGroup;
+  private String hostgroupName;
+  private Predicate predicate;
+  private int cardinality = -1;
+  private String hostname = null;
+  private String cluster;
+  private boolean containsMaster;
+  private long stageId = -1;
+  //todo: should be able to use the presence of hostName for this
+  private boolean outstanding = true;
+
+  //todo: remove
+  private Map<String, Long> logicalInstallTaskIds = new HashMap<String, Long>();
+  //todo: remove
+  private Map<String, Long> logicalStartTaskIds = new HashMap<String, Long>();
+
+  Collection<HostRoleCommand> logicalTasks = new ArrayList<HostRoleCommand>();
+
+  // logical task id -> physical tasks
+  private Map<Long, Collection<Long>> physicalTasks = new HashMap<Long, Collection<Long>>();
+
+  private static HostResourceProvider hostResourceProvider;
+
+  private HostComponentResourceProvider hostComponentResourceProvider;
+
+  private AmbariManagementController controller = getController();
+  private ActionManager actionManager = controller.getActionManager();
+  private ConfigHelper configHelper = controller.getConfigHelper();
+  private AmbariMetaInfo metaInfoManager = controller.getAmbariMetaInfo();
+
+  //todo: temporary refactoring step
+  private TopologyManager.ClusterTopologyContext topologyContext;
+
+  private static HostRoleCommandFactory hostRoleCommandFactory;
+
+  public static void init(HostRoleCommandFactory factory) {
+    hostRoleCommandFactory = factory;
+  }
+
+  public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
+                     int cardinality, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+    this.requestId = requestId;
+    this.stageId = stageId;
+    this.cluster = cluster;
+    this.blueprint = blueprintName;
+    this.hostGroup = hostGroup;
+    this.hostgroupName = hostGroup.getName();
+    this.cardinality = cardinality;
+    this.predicate = predicate;
+    this.containsMaster = hostGroup.containsMasterComponent();
+    this.topologyContext = topologyContext;
+
+    createTasks();
+    System.out.println("HostRequest: Created request: Host Association Pending");
+  }
+
+  public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
+                     String hostname, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+    this.requestId = requestId;
+    this.stageId = stageId;
+    this.cluster = cluster;
+    this.blueprint = blueprintName;
+    this.hostGroup = hostGroup;
+    this.hostgroupName = hostGroup.getName();
+    this.hostname = hostname;
+    this.predicate = predicate;
+    this.containsMaster = hostGroup.containsMasterComponent();
+    this.topologyContext = topologyContext;
+
+    createTasks();
+    System.out.println("HostRequest: Created request for host: " + hostname);
+  }
+
+  //todo: synchronization
+  public synchronized HostOfferResponse offer(HostImpl host) {
+    if (! outstanding) {
+      return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+    }
+    if (matchesHost(host)) {
+      outstanding = false;
+      hostname = host.getHostName();
+      List<TopologyTask> tasks = provision(host);
+
+      return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, hostGroup.getName(), tasks);
+    } else {
+      return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE);
+    }
+  }
+
+  public void setHostName(String hostName) {
+    this.hostname = hostName;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public String getClusterName() {
+    return cluster;
+  }
+  public String getBlueprint() {
+    return blueprint;
+  }
+
+  public HostGroup getHostGroup() {
+    return hostGroup;
+  }
+
+  public String getHostgroupName() {
+    return hostgroupName;
+  }
+
+  public int getCardinality() {
+    return cardinality;
+  }
+
+  public Predicate getPredicate() {
+    return predicate;
+  }
+
+
+  private List<TopologyTask> provision(HostImpl host) {
+    List<TopologyTask> tasks = new ArrayList<TopologyTask>();
+
+    tasks.add(new CreateHostResourcesTask(topologyContext.getClusterTopology(), host, getHostgroupName()));
+    setHostOnTasks(host);
+
+    HostGroup hostGroup = getHostGroup();
+    tasks.add(new ConfigureConfigGroup(getConfigurationGroupName(hostGroup.getBlueprintName(),
+        hostGroup.getName()), getClusterName(), hostname));
+
+    tasks.add(getInstallTask());
+    tasks.add(getStartTask());
+
+    return tasks;
+  }
+
+  private void createTasks() {
+    HostGroup hostGroup = getHostGroup();
+    for (String component : hostGroup.getComponents()) {
+      if (component == null || component.equals("AMBARI_SERVER")) {
+        System.out.printf("Skipping component %s when creating request\n", component);
+        continue;
+      }
+
+      String hostName = getHostName() != null ?
+          getHostName() :
+          "PENDING HOST ASSIGNMENT : HOSTGROUP=" + getHostgroupName();
+
+      HostRoleCommand installTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.INSTALL);
+      installTask.setStatus(HostRoleStatus.PENDING);
+      installTask.setTaskId(topologyContext.getNextTaskId());
+      installTask.setRequestId(getRequestId());
+      installTask.setStageId(stageId);
+
+      //todo: had to add requestId to ShortTaskStatus
+      //todo: revert addition of requestId when we are using LogicalTask
+      installTask.setRequestId(getRequestId());
+
+      logicalTasks.add(installTask);
+      registerLogicalInstallTaskId(component, installTask.getTaskId());
+
+      Stack stack = hostGroup.getStack();
+      try {
+        // if component isn't a client, add a start task
+        if (! metaInfoManager.getComponent(stack.getName(), stack.getVersion(), stack.getServiceForComponent(component), component).isClient()) {
+          HostRoleCommand startTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.START);
+          startTask.setStatus(HostRoleStatus.PENDING);
+          startTask.setRequestId(getRequestId());
+          startTask.setTaskId(topologyContext.getNextTaskId());
+          startTask.setRequestId(getRequestId());
+          startTask.setStageId(stageId);
+          logicalTasks.add(startTask);
+          registerLogicalStartTaskId(component, startTask.getTaskId());
+        }
+      } catch (AmbariException e) {
+        e.printStackTrace();
+        //todo: how to handle
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Get a config group name based on a bp and host group.
+   *
+   * @param bpName         blueprint name
+   * @param hostGroupName  host group name
+   * @return  config group name
+   */
+  protected String getConfigurationGroupName(String bpName, String hostGroupName) {
+    return String.format("%s:%s", bpName, hostGroupName);
+  }
+
+  private void setHostOnTasks(HostImpl host) {
+    for (HostRoleCommand task : getTasks()) {
+      task.setHostEntity(host.getHostEntity());
+    }
+  }
+
+  //todo: analyze all all configuration needs for dealing with deprecated properties
+  /**
+   * Since global configs are deprecated since 1.7.0, but still supported.
+   * We should automatically map any globals used, to *-env dictionaries.
+   *
+   * @param blueprintConfigurations  map of blueprint configurations keyed by type
+   */
+  private void handleGlobalsBackwardsCompability(Stack stack,
+                                                 Map<String, Map<String, String>> blueprintConfigurations) {
+
+    StackId stackId = new StackId(stack.getName(), stack.getVersion());
+    configHelper.moveDeprecatedGlobals(stackId, blueprintConfigurations, getClusterName());
+  }
+
+  public Collection<HostRoleCommand> getTasks() {
+    // sync logical task state with physical tasks
+    for (HostRoleCommand logicalTask : logicalTasks) {
+      Collection<Long> physicalTaskIds = physicalTasks.get(logicalTask.getTaskId());
+      if (physicalTaskIds != null) {
+        //todo: for now only one physical task per logical task
+        long physicalTaskId = physicalTaskIds.iterator().next();
+        HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+        if (physicalTask != null) {
+          logicalTask.setStatus(physicalTask.getStatus());
+          logicalTask.setCommandDetail(physicalTask.getCommandDetail());
+          logicalTask.setCustomCommandName(physicalTask.getCustomCommandName());
+          //todo: once we retry on failures, start/end times could span multiple physical tasks
+          logicalTask.setStartTime(physicalTask.getStartTime());
+          logicalTask.setEndTime(physicalTask.getEndTime());
+          logicalTask.setErrorLog(physicalTask.getErrorLog());
+          logicalTask.setExitCode(physicalTask.getExitCode());
+          logicalTask.setExecutionCommandWrapper(physicalTask.getExecutionCommandWrapper());
+          //todo: may be handled at a higher level than physical task
+          logicalTask.setLastAttemptTime(physicalTask.getLastAttemptTime());
+          logicalTask.setOutputLog(physicalTask.getOutputLog());
+          logicalTask.setStderr(physicalTask.getStderr());
+          logicalTask.setStdout(physicalTask.getStdout());
+          logicalTask.setStructuredOut(physicalTask.getStructuredOut());
+        }
+      }
+    }
+    return logicalTasks;
+  }
+
+  public Collection<HostRoleCommandEntity> getTaskEntities() {
+    Collection<HostRoleCommandEntity> taskEntities = new ArrayList<HostRoleCommandEntity>();
+    for (HostRoleCommand task : logicalTasks) {
+      HostRoleCommandEntity entity = task.constructNewPersistenceEntity();
+      // the above method doesn't set all of the fields for some unknown reason
+      entity.setRequestId(task.getRequestId());
+      entity.setStageId(task.getStageId());
+      entity.setTaskId(task.getTaskId());
+      entity.setOutputLog(task.getOutputLog());
+      entity.setErrorLog(task.errorLog);
+
+      // set state from physical task
+      Collection<Long> physicalTaskIds = physicalTasks.get(task.getTaskId());
+      if (physicalTaskIds != null) {
+        //todo: for now only one physical task per logical task
+        long physicalTaskId = physicalTaskIds.iterator().next();
+        HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+        if (physicalTask != null) {
+          entity.setStatus(physicalTask.getStatus());
+          entity.setCommandDetail(physicalTask.getCommandDetail());
+          entity.setCustomCommandName(physicalTask.getCustomCommandName());
+          //todo: once we retry on failures, start/end times could span multiple physical tasks
+          entity.setStartTime(physicalTask.getStartTime());
+          entity.setEndTime(physicalTask.getEndTime());
+          entity.setErrorLog(physicalTask.getErrorLog());
+          entity.setExitcode(physicalTask.getExitCode());
+          //todo: may be handled at a higher level than physical task
+          entity.setLastAttemptTime(physicalTask.getLastAttemptTime());
+          entity.setOutputLog(physicalTask.getOutputLog());
+          entity.setStdError(physicalTask.getStderr().getBytes());
+          entity.setStdOut(physicalTask.getStdout().getBytes());
+          entity.setStructuredOut(physicalTask.getStructuredOut().getBytes());
+        }
+      }
+
+      taskEntities.add(entity);
+    }
+    return taskEntities;
+  }
+
+  public boolean containsMaster() {
+    return containsMaster;
+  }
+
+  public boolean matchesHost(HostImpl host) {
+    if (hostname != null) {
+      return host.getHostName().equals(hostname);
+    } else if (predicate != null) {
+      return predicate.evaluate(new HostResourceAdapter(host));
+    } else {
+      return true;
+    }
+  }
+
+  public String getHostName() {
+    return hostname;
+  }
+
+  public long getStageId() {
+    return stageId;
+  }
+
+  //todo: remove
+  private void registerLogicalInstallTaskId(String component, long taskId) {
+    logicalInstallTaskIds.put(component, taskId);
+  }
+
+  //todo: remove
+  private void registerLogicalStartTaskId(String component, long taskId) {
+    logicalStartTaskIds.put(component, taskId);
+  }
+
+  //todo: remove
+  private long getLogicalInstallTaskId(String component) {
+    return logicalInstallTaskIds.get(component);
+  }
+
+  //todo: remove
+  private long getLogicalStartTaskId(String component) {
+    return logicalStartTaskIds.get(component);
+  }
+
+  //todo: since this is used to determine equality, using hashCode() isn't safe as it can return the same
+  //todo: value for 2 unequal requests
+  @Override
+  public int compareTo(HostRequest other) {
+    if (containsMaster()) {
+      return other.containsMaster() ? hashCode() - other.hashCode() : -1;
+    } else if (other.containsMaster()) {
+      return 1;
+    } else return hashCode() - other.hashCode();
+  }
+
+  //todo: once we have logical tasks, move tracking of physical tasks there
+  public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) {
+    Collection<Long> physicalTasksForId = physicalTasks.get(logicalTaskId);
+    if (physicalTasksForId == null) {
+      physicalTasksForId = new HashSet<Long>();
+      physicalTasks.put(logicalTaskId, physicalTasksForId);
+    }
+    physicalTasksForId.add(physicalTaskId);
+  }
+
+  //todo: temporary step
+  public TopologyTask getInstallTask() {
+    return new InstallHostTask();
+  }
+
+  //todo: temporary step
+  public TopologyTask getStartTask() {
+    return new StartHostTask();
+  }
+
+  //todo: temporary refactoring step
+  public HostGroupInfo createHostGroupInfo(HostGroup group) {
+    HostGroupInfo info = new HostGroupInfo(group.getName());
+    info.setConfiguration(group.getConfiguration());
+
+    return info;
+  }
+
+  private synchronized HostResourceProvider getHostResourceProvider() {
+    if (hostResourceProvider == null) {
+      hostResourceProvider = (HostResourceProvider)
+          ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.Host);
+
+    }
+    return hostResourceProvider;
+  }
+
+  private synchronized HostComponentResourceProvider getHostComponentResourceProvider() {
+    if (hostComponentResourceProvider == null) {
+      hostComponentResourceProvider = (HostComponentResourceProvider)
+          ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
+    }
+    return hostComponentResourceProvider;
+  }
+
+  //todo: extract
+  private class InstallHostTask implements TopologyTask {
+    //todo: use future to obtain returned Response which contains the request id
+    //todo: error handling
+    //todo: monitor status of requests
+
+    @Override
+    public Type getType() {
+      return Type.INSTALL;
+    }
+
+    @Override
+    public void run() {
+      try {
+        System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname);
+        RequestStatusResponse response = getHostResourceProvider().install(getHostName(), cluster);
+        // map logical install tasks to physical install tasks
+        List<ShortTaskStatus> underlyingTasks = response.getTasks();
+        for (ShortTaskStatus task : underlyingTasks) {
+          Long logicalInstallTaskId = getLogicalInstallTaskId(task.getRole());
+          //todo: for now only one physical task per component
+          long taskId = task.getTaskId();
+          //physicalTasks.put(logicalInstallTaskId, Collections.singleton(taskId));
+          registerPhysicalTaskId(logicalInstallTaskId, taskId);
+
+          //todo: move this to provision
+          //todo: shouldn't have to iterate over all tasks to find install task
+          //todo: we are doing the same thing in the above registerPhysicalTaskId() call
+          // set attempt count on task
+          for (HostRoleCommand logicalTask : logicalTasks) {
+            if (logicalTask.getTaskId() == logicalInstallTaskId) {
+              logicalTask.incrementAttemptCount();
+            }
+          }
+        }
+      } catch (ResourceAlreadyExistsException e) {
+        e.printStackTrace();
+      } catch (SystemException e) {
+        e.printStackTrace();
+      } catch (NoSuchParentResourceException e) {
+        e.printStackTrace();
+      } catch (UnsupportedPropertyException e) {
+        e.printStackTrace();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  //todo: extract
+  private class StartHostTask implements TopologyTask {
+    //todo: use future to obtain returned Response which contains the request id
+    //todo: error handling
+    //todo: monitor status of requests
+
+    @Override
+    public Type getType() {
+      return Type.START;
+    }
+
+    @Override
+    public void run() {
+      try {
+        System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname);
+        RequestStatusResponse response = getHostComponentResourceProvider().start(cluster, hostname);
+        // map logical install tasks to physical install tasks
+        List<ShortTaskStatus> underlyingTasks = response.getTasks();
+        for (ShortTaskStatus task : underlyingTasks) {
+          String component = task.getRole();
+          Long logicalStartTaskId = getLogicalStartTaskId(component);
+          // for now just set on outer map
+          registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
+
+          //todo: move this to provision
+          // set attempt count on task
+          for (HostRoleCommand logicalTask : logicalTasks) {
+            if (logicalTask.getTaskId() == logicalStartTaskId) {
+              logicalTask.incrementAttemptCount();
+            }
+          }
+        }
+      } catch (SystemException e) {
+        e.printStackTrace();
+      } catch (UnsupportedPropertyException e) {
+        e.printStackTrace();
+      } catch (NoSuchParentResourceException e) {
+        e.printStackTrace();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private class CreateHostResourcesTask implements TopologyTask {
+    private ClusterTopology topology;
+    private HostImpl host;
+    private String groupName;
+
+    public CreateHostResourcesTask(ClusterTopology topology, HostImpl host, String groupName) {
+      this.topology = topology;
+      this.host = host;
+      this.groupName = groupName;
+    }
+
+    @Override
+    public Type getType() {
+      return Type.RESOURCE_CREATION;
+    }
+
+    @Override
+    public void run() {
+      try {
+        createHostResources();
+      } catch (AmbariException e) {
+        //todo: report error to caller
+        e.printStackTrace();
+        System.out.println("An error occurred when creating host resources: " + e.toString());
+      }
+    }
+
+    private void createHostResources() throws AmbariException {
+      Map<String, Object> properties = new HashMap<String, Object>();
+      properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, getClusterName());
+      properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, host.getHostName());
+      properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo());
+
+      getHostResourceProvider().createHosts(new RequestImpl(null, Collections.singleton(properties), null, null));
+      createHostComponentResources();
+    }
+
+    private void createHostComponentResources() throws AmbariException {
+      Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
+      Stack stack = topology.getBlueprint().getStack();
+      for (String component : topology.getBlueprint().getHostGroup(groupName).getComponents()) {
+        //todo: handle this in a generic manner.  These checks are all over the code
+        if (! component.equals("AMBARI_SERVER")) {
+          requests.add(new ServiceComponentHostRequest(topology.getClusterName(),
+              stack.getServiceForComponent(component), component, host.getHostName(), null));
+        }
+      }
+
+      controller.createHostComponents(requests);
+    }
+  }
+
+  //todo: extract
+  private class ConfigureConfigGroup implements TopologyTask {
+    private String groupName;
+    private String clusterName;
+    private String hostName;
+
+    public ConfigureConfigGroup(String groupName, String clusterName, String hostName) {
+      this.groupName = groupName;
+      this.clusterName = clusterName;
+      this.hostName = hostName;
+    }
+
+    @Override
+    public Type getType() {
+      return Type.CONFIGURE;
+    }
+
+    @Override
+    public void run() {
+      try {
+        //todo: add task to offer response
+        if (! addHostToExistingConfigGroups()) {
+          createConfigGroupsAndRegisterHost();
+        }
+      } catch (Exception e) {
+        //todo: handle exceptions
+        e.printStackTrace();
+        throw new RuntimeException("Unable to register config group for host: " + hostname);
+      }
+    }
+
+    /**
+     * Add the new host to an existing config group.
+     *
+     * @throws SystemException                an unknown exception occurred
+     * @throws UnsupportedPropertyException   an unsupported property was specified in the request
+     * @throws NoSuchParentResourceException  a parent resource doesn't exist
+     */
+    private boolean addHostToExistingConfigGroups()
+        throws SystemException,
+        UnsupportedPropertyException,
+        NoSuchParentResourceException {
+
+      boolean addedHost = false;
+
+      Clusters clusters;
+      Cluster cluster;
+      try {
+        clusters = controller.getClusters();
+        cluster = clusters.getCluster(clusterName);
+      } catch (AmbariException e) {
+        throw new IllegalArgumentException(
+            String.format("Attempt to add hosts to a non-existent cluster: '%s'", clusterName));
+      }
+      // I don't know of a method to get config group by name
+      //todo: add a method to get config group by name
+      Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups();
+      for (ConfigGroup group : configGroups.values()) {
+        if (group.getName().equals(groupName)) {
+          try {
+            group.addHost(clusters.getHost(hostName));
+            group.persist();
+            addedHost = true;
+          } catch (AmbariException e) {
+            // shouldn't occur, this host was just added to the cluster
+            throw new SystemException(String.format(
+                "Unable to obtain newly created host '%s' from cluster '%s'", hostName, clusterName));
+          }
+        }
+      }
+      return addedHost;
+    }
+
+    /**
+     * Register config groups for host group scoped configuration.
+     * For each host group with configuration specified in the blueprint, a config group is created
+     * and the hosts associated with the host group are assigned to the config group.
+     *
+     * @throws ResourceAlreadyExistsException attempt to create a config group that already exists
+     * @throws SystemException                an unexpected exception occurs
+     * @throws UnsupportedPropertyException   an invalid property is provided when creating a config group
+     * @throws NoSuchParentResourceException  attempt to create a config group for a non-existing cluster
+     */
+    private void createConfigGroupsAndRegisterHost() throws
+        ResourceAlreadyExistsException, SystemException,
+        UnsupportedPropertyException, NoSuchParentResourceException {
+
+      //HostGroupEntity entity = hostGroup.getEntity();
+      HostGroup hostGroup = getHostGroup();
+      Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>();
+
+      Stack stack = hostGroup.getStack();
+
+      // get the host-group config with cluster creation template overrides
+      Configuration topologyHostGroupConfig = topologyContext.getClusterTopology().
+          getHostGroupInfo().get(hostGroup.getName()).getConfiguration();
+
+      //handling backwards compatibility for group configs
+      //todo: doesn't belong here
+      handleGlobalsBackwardsCompability(stack, topologyHostGroupConfig.getProperties());
+
+      // iterate over topo host group configs which were defined in CCT/HG and BP/HG only, no parent configs
+      for (Map.Entry<String, Map<String, String>> entry: topologyHostGroupConfig.getProperties().entrySet()) {
+        String type = entry.getKey();
+        String service = stack.getServiceForConfigType(type);
+        Config config = new ConfigImpl(type);
+        config.setTag(hostGroup.getName());
+        config.setProperties(entry.getValue());
+        //todo: attributes
+        Map<String, Config> serviceConfigs = groupConfigs.get(service);
+        if (serviceConfigs == null) {
+          serviceConfigs = new HashMap<String, Config>();
+          groupConfigs.put(service, serviceConfigs);
+        }
+        serviceConfigs.put(type, config);
+      }
+
+      String bpName = topologyContext.getClusterTopology().getBlueprint().getName();
+      for (Map.Entry<String, Map<String, Config>> entry : groupConfigs.entrySet()) {
+        String service = entry.getKey();
+        Map<String, Config> serviceConfigs = entry.getValue();
+        String absoluteGroupName = getConfigurationGroupName(bpName, hostGroup.getName());
+        Collection<String> groupHosts;
+
+        groupHosts = topologyContext.getClusterTopology().getHostGroupInfo().
+            get(hostgroupName).getHostNames();
+
+        ConfigGroupRequest request = new ConfigGroupRequest(
+            null, getClusterName(), absoluteGroupName, service, "Host Group Configuration",
+            new HashSet<String>(groupHosts), serviceConfigs);
+
+        // get the config group provider and create config group resource
+        ConfigGroupResourceProvider configGroupProvider = (ConfigGroupResourceProvider)
+            ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.ConfigGroup);
+        configGroupProvider.createResources(Collections.singleton(request));
+      }
+    }
+
+
+  }
+
+  private class HostResourceAdapter implements Resource {
+    Resource hostResource;
+
+    public HostResourceAdapter(HostImpl host) {
+      buildPropertyMap(host);
+    }
+
+    @Override
+    public Object getPropertyValue(String id) {
+      return hostResource.getPropertyValue(id);
+    }
+
+    @Override
+    public Map<String, Map<String, Object>> getPropertiesMap() {
+      return hostResource.getPropertiesMap();
+    }
+
+    @Override
+    public Type getType() {
+      return Type.Host;
+    }
+
+    @Override
+    public void addCategory(String id) {
+      // read only, nothing to do
+    }
+
+    @Override
+    public void setProperty(String id, Object value) {
+      // read only, nothing to do
+    }
+
+    private void buildPropertyMap(HostImpl host) {
+      hostResource = new ResourceImpl(Resource.Type.Host);
+
+      hostResource.setProperty(HostResourceProvider.HOST_NAME_PROPERTY_ID,
+          host.getHostName());
+      hostResource.setProperty(HostResourceProvider.HOST_PUBLIC_NAME_PROPERTY_ID,
+          host.getPublicHostName());
+      hostResource.setProperty(HostResourceProvider.HOST_IP_PROPERTY_ID,
+          host.getIPv4());
+      hostResource.setProperty(HostResourceProvider.HOST_TOTAL_MEM_PROPERTY_ID,
+          host.getTotalMemBytes());
+      hostResource.setProperty(HostResourceProvider.HOST_CPU_COUNT_PROPERTY_ID,
+          (long) host.getCpuCount());
+      hostResource.setProperty(HostResourceProvider.HOST_PHYSICAL_CPU_COUNT_PROPERTY_ID,
+          (long) host.getPhCpuCount());
+      hostResource.setProperty(HostResourceProvider.HOST_OS_ARCH_PROPERTY_ID,
+          host.getOsArch());
+      hostResource.setProperty(HostResourceProvider.HOST_OS_TYPE_PROPERTY_ID,
+          host.getOsType());
+      hostResource.setProperty(HostResourceProvider.HOST_OS_FAMILY_PROPERTY_ID,
+          host.getOsFamily());
+      hostResource.setProperty(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID,
+          host.getRackInfo());
+      hostResource.setProperty(HostResourceProvider.HOST_LAST_HEARTBEAT_TIME_PROPERTY_ID,
+          host.getLastHeartbeatTime());
+      hostResource.setProperty(HostResourceProvider.HOST_LAST_AGENT_ENV_PROPERTY_ID,
+          host.getLastAgentEnv());
+      hostResource.setProperty(HostResourceProvider.HOST_LAST_REGISTRATION_TIME_PROPERTY_ID,
+          host.getLastRegistrationTime());
+      hostResource.setProperty(HostResourceProvider.HOST_HOST_STATUS_PROPERTY_ID,
+          host.getStatus());
+      hostResource.setProperty(HostResourceProvider.HOST_HOST_HEALTH_REPORT_PROPERTY_ID,
+          host.getHealthStatus().getHealthReport());
+      hostResource.setProperty(HostResourceProvider.HOST_DISK_INFO_PROPERTY_ID,
+          host.getDisksInfo());
+      hostResource.setProperty(HostResourceProvider.HOST_STATE_PROPERTY_ID,
+          host.getState());
+    }      
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
new file mode 100644
index 0000000..042e9fc
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * Indicates an invalid topology.
+ */
+public class InvalidTopologyException extends Exception {
+  public InvalidTopologyException(String s) {
+    super(s);
+  }
+
+  public InvalidTopologyException(String s, Throwable throwable) {
+    super(s, throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
new file mode 100644
index 0000000..85422a0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * The information provided is invalid for the template.
+
+ * To change this template use File | Settings | File Templates.
+ */
+public class InvalidTopologyTemplateException extends Exception {
+  public InvalidTopologyTemplateException(String s) {
+    super(s);
+  }
+
+  public InvalidTopologyTemplateException(String s, Throwable throwable) {
+    super(s, throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
new file mode 100644
index 0000000..5273ff8
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.apache.ambari.server.controller.AmbariServer.getController;
+
+/**
+ * Logical Request implementation.
+ */
+public class LogicalRequest extends Request {
+
+  private Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>();
+  // sorted set with master host requests given priority
+  private Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>();
+  private Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>();
+
+  private final ClusterTopology topology;
+
+
+  //todo: topologyContext is a temporary refactoring step
+  public LogicalRequest(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) throws AmbariException {
+    //todo: abstract usage of controller, etc ...
+    super(getController().getActionManager().getNextRequestId(), getController().getClusters().getCluster(
+        requestRequest.getClusterName()).getClusterId(), getController().getClusters());
+
+    this.topology = topologyContext.getClusterTopology();
+    createHostRequests(requestRequest, topologyContext);
+  }
+
+  public HostOfferResponse offer(HostImpl host) {
+    // attempt to match to a host request with an explicit host reservation first
+    synchronized (requestsWithReservedHosts) {
+      HostRequest hostRequest = requestsWithReservedHosts.remove(host.getHostName());
+      if (hostRequest != null) {
+        HostOfferResponse response = hostRequest.offer(host);
+        if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
+          //todo: error handling.  This is really a system exception and shouldn't happen
+          throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " +
+              host.getHostName());
+        }
+        return response;
+      }
+    }
+
+    // not explicitly reserved, at least not in this request, so attempt to match to outstanding host requests
+    boolean predicateRejected = false;
+    synchronized (outstandingHostRequests) {
+      //todo: prioritization of master host requests
+      Iterator<HostRequest> hostRequestIterator = outstandingHostRequests.iterator();
+      while (hostRequestIterator.hasNext()) {
+        HostOfferResponse response = hostRequestIterator.next().offer(host);
+        switch (response.getAnswer()) {
+          case ACCEPTED:
+            hostRequestIterator.remove();
+            return response;
+          case DECLINED_DONE:
+            //todo: should have been done on ACCEPT
+            hostRequestIterator.remove();
+          case DECLINED_PREDICATE:
+            predicateRejected = true;
+        }
+      }
+    }
+    // if at least one outstanding host request rejected for predicate or we have an outstanding request
+    // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved
+    //todo: could also check if outstandingHostRequests is empty
+    return predicateRejected || ! requestsWithReservedHosts.isEmpty() ?
+        new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) :
+        new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+  }
+
+  //todo
+  @Override
+  public Collection<Stage> getStages() {
+    return super.getStages();
+  }
+
+  @Override
+  public List<HostRoleCommand> getCommands() {
+    List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
+    for (HostRequest hostRequest : allHostRequests) {
+      commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getTasks()));
+    }
+    return commands;
+  }
+
+  public Collection<String> getReservedHosts() {
+    return requestsWithReservedHosts.keySet();
+  }
+
+  //todo: account for blueprint name?
+  //todo: this should probably be done implicitly at a lower level
+  public boolean areGroupsResolved(Collection<String> hostGroupNames) {
+    synchronized (outstandingHostRequests) {
+      // iterate over outstanding host requests
+      for (HostRequest request : outstandingHostRequests) {
+        if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public Map<String, Collection<String>> getProjectedTopology() {
+    Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
+
+    //todo: synchronization
+    for (HostRequest hostRequest : allHostRequests) {
+      HostGroup hostGroup = hostRequest.getHostGroup();
+      for (String host : topology.getHostGroupInfo().get(hostGroup.getName()).getHostNames()) {
+        Collection<String> hostComponents = hostComponentMap.get(host);
+        if (hostComponents == null) {
+          hostComponents = new HashSet<String>();
+          hostComponentMap.put(host, hostComponents);
+        }
+        hostComponents.addAll(hostGroup.getComponents());
+      }
+    }
+    return hostComponentMap;
+  }
+
+  //todo: currently we are just returning all stages for all requests
+  //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each
+  //todo: needed to change the name to avoid a name collision.
+  public Collection<StageEntity> getStageEntities() {
+    Collection<StageEntity> stages = new ArrayList<StageEntity>();
+    for (HostRequest hostRequest : allHostRequests) {
+      StageEntity stage = new StageEntity();
+      stage.setStageId(hostRequest.getStageId());
+      stage.setRequestContext(getRequestContext());
+      stage.setRequestId(getRequestId());
+      //todo: not sure what this byte array is???
+      //stage.setClusterHostInfo();
+      stage.setClusterId(getClusterId());
+      stage.setSkippable(false);
+      // getTaskEntities() sync's state with physical tasks
+      stage.setHostRoleCommands(hostRequest.getTaskEntities());
+
+      stages.add(stage);
+    }
+    return stages;
+  }
+
+  public RequestStatusResponse getRequestStatus() {
+    RequestStatusResponse requestStatus = new RequestStatusResponse(getRequestId());
+    requestStatus.setRequestContext(getRequestContext());
+    //todo: other request status fields
+    //todo: ordering of tasks?
+
+    // convert HostRoleCommands to ShortTaskStatus
+    List<ShortTaskStatus> shortTasks = new ArrayList<ShortTaskStatus>();
+    for (HostRoleCommand task : getCommands()) {
+      shortTasks.add(new ShortTaskStatus(task));
+    }
+    requestStatus.setTasks(shortTasks);
+    //todo: null tasks?
+
+    return requestStatus;
+  }
+
+  public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries() {
+    Map<Long, HostRoleCommandStatusSummaryDTO> summaryMap = new HashMap<Long, HostRoleCommandStatusSummaryDTO>();
+
+    for (StageEntity stage : getStageEntities()) {
+      //Number minStartTime = 0;
+      //Number maxEndTime = 0;
+      int aborted = 0;
+      int completed = 0;
+      int failed = 0;
+      int holding = 0;
+      int holdingFailed = 0;
+      int holdingTimedout = 0;
+      int inProgress = 0;
+      int pending = 0;
+      int queued = 0;
+      int timedout = 0;
+
+      //todo: where does this logic belong?
+      for (HostRoleCommandEntity task : stage.getHostRoleCommands()) {
+        HostRoleStatus taskStatus = task.getStatus();
+
+        switch (taskStatus) {
+          case ABORTED:
+            aborted += 1;
+            break;
+          case COMPLETED:
+            completed += 1;
+            break;
+          case FAILED:
+            failed += 1;
+            break;
+          case HOLDING:
+            holding += 1;
+            break;
+          case HOLDING_FAILED:
+            holdingFailed += 1;
+            break;
+          case HOLDING_TIMEDOUT:
+            holdingTimedout += 1;
+            break;
+          case IN_PROGRESS:
+            inProgress += 1;
+            break;
+          case PENDING:
+            pending += 1;
+            break;
+          case QUEUED:
+            queued += 1;
+            break;
+          case TIMEDOUT:
+            timedout += 1;
+            break;
+          default:
+            //todo: proper log msg
+            System.out.println("Unexpected status when creating stage summaries: " + taskStatus);
+        }
+      }
+
+      //todo: skippable.  I only see a skippable field on the stage, not the tasks
+      //todo: time related fields
+      HostRoleCommandStatusSummaryDTO stageSummary = new HostRoleCommandStatusSummaryDTO(stage.isSkippable() ? 1 : 0, 0, 0,
+          stage.getStageId(), aborted, completed, failed, holding, holdingFailed, holdingTimedout, inProgress, pending, queued, timedout);
+      summaryMap.put(stage.getStageId(), stageSummary);
+    }
+    return summaryMap;
+  }
+
+  //todo: context is a temporary refactoring step
+  private void createHostRequests(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) {
+    //todo: consistent stage ordering
+    //todo: confirm that stages don't need to be unique across requests
+    long stageIdCounter = 0;
+    Map<String, HostGroupInfo> hostGroupInfoMap = requestRequest.getHostGroupInfo();
+    for (HostGroupInfo hostGroupInfo : hostGroupInfoMap.values()) {
+      String groupName = hostGroupInfo.getHostGroupName();
+      Blueprint blueprint = topology.getBlueprint();
+      int hostCardinality;
+      List<String> hostnames;
+
+      hostCardinality = hostGroupInfo.getRequestedHostCount();
+      hostnames = new ArrayList<String>(hostGroupInfo.getHostNames());
+
+
+      for (int i = 0; i < hostCardinality; ++i) {
+        if (! hostnames.isEmpty()) {
+          // host names are specified
+          String hostname = hostnames.get(i);
+          //todo: pass in HostGroupInfo
+          HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
+              blueprint.getName(), blueprint.getHostGroup(groupName), hostname, hostGroupInfo.getPredicate(),
+              topologyContext);
+          synchronized (requestsWithReservedHosts) {
+            requestsWithReservedHosts.put(hostname, hostRequest);
+          }
+        } else {
+          // host count is specified
+          //todo: pass in HostGroupInfo
+          HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
+              blueprint.getName(), blueprint.getHostGroup(groupName), hostCardinality, hostGroupInfo.getPredicate(),
+              topologyContext);
+          outstandingHostRequests.add(hostRequest);
+        }
+      }
+    }
+
+    allHostRequests.addAll(outstandingHostRequests);
+    allHostRequests.addAll(requestsWithReservedHosts.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
new file mode 100644
index 0000000..5ce2532
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * The requested blueprint doesn't exist
+ */
+public class NoSuchBlueprintException extends Exception {
+  public NoSuchBlueprintException(String name) {
+    super(String.format("No blueprint exists with the name '%s'", name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
new file mode 100644
index 0000000..413cb4e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: john
+ * Date: 4/3/15
+ * Time: 3:59 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class NoSuchHostGroupException extends Exception {
+  public NoSuchHostGroupException(String hostgroupName) {
+    super("Requested HostGroup doesn't exist: " + hostgroupName);
+  }
+
+  public NoSuchHostGroupException(String hostgroupName, String msg) {
+    super(msg + ".  Cause: Requested HostGroup doesn't exist: " + hostgroupName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
new file mode 100644
index 0000000..870d718
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.controller.internal.Stack;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * Validates that all required passwords are provided.
+ */
+public class RequiredPasswordValidator implements TopologyValidator {
+
+  private String defaultPassword;
+
+  public RequiredPasswordValidator(String defaultPassword) {
+    this.defaultPassword = defaultPassword;
+  }
+
+  /**
+   * Validate that all required password properties have been set or that 'default_password' is specified.
+   *
+   * @throws InvalidTopologyException if required password properties are missing and no
+   *                                  default is specified via 'default_password'
+   */
+  public void validate(ClusterTopology topology) throws InvalidTopologyException {
+    Map<String, Map<String, Collection<String>>> missingPasswords = validateRequiredPasswords(topology);
+
+    if (! missingPasswords.isEmpty()) {
+      throw new InvalidTopologyException("Missing required password properties.  Specify a value for these " +
+          "properties in the cluster or host group configurations or include 'default_password' field in request. " +
+          missingPasswords);
+    }
+  }
+
+  /**
+   * Validate all configurations.  Validation is done on the operational configuration of each
+   * host group.  An operational configuration is achieved by overlaying host group configuration
+   * on top of cluster configuration which overlays the default stack configurations.
+   *
+   * @return map of required properties which are missing.  Empty map if none are missing.
+   *
+   * @throws IllegalArgumentException if blueprint contains invalid information
+   */
+
+  //todo: this is copied/pasted from Blueprint and is currently only used by validatePasswordProperties()
+  //todo: seems that we should have some common place for this code so it can be used by BP and here?
+  private Map<String, Map<String, Collection<String>>> validateRequiredPasswords(ClusterTopology topology) {
+
+    Map<String, Map<String, Collection<String>>> missingProperties =
+        new HashMap<String, Map<String, Collection<String>>>();
+
+    for (Map.Entry<String, HostGroupInfo> groupEntry: topology.getHostGroupInfo().entrySet()) {
+      String hostGroupName = groupEntry.getKey();
+      Map<String, Map<String, String>> groupProperties =
+          groupEntry.getValue().getConfiguration().getFullProperties(3);
+
+      Collection<String> processedServices = new HashSet<String>();
+      Blueprint blueprint = topology.getBlueprint();
+      Stack stack = blueprint.getStack();
+
+      HostGroup hostGroup = blueprint.getHostGroup(hostGroupName);
+      for (String component : hostGroup.getComponents()) {
+        //for now, AMBARI is not recognized as a service in Stacks
+        if (component.equals("AMBARI_SERVER")) {
+          continue;
+        }
+
+        String serviceName = stack.getServiceForComponent(component);
+        if (processedServices.add(serviceName)) {
+          //todo: do I need to subtract excluded configs?
+          Collection<Stack.ConfigProperty> requiredProperties =
+              stack.getRequiredConfigurationProperties(serviceName, "PASSWORD");
+
+          for (Stack.ConfigProperty property : requiredProperties) {
+            String category = property.getType();
+            String name = property.getName();
+            if (! propertyExists(topology, groupProperties, category, name)) {
+              Map<String, Collection<String>> missingHostGroupPropsMap = missingProperties.get(hostGroupName);
+              if (missingHostGroupPropsMap == null) {
+                missingHostGroupPropsMap = new HashMap<String, Collection<String>>();
+                missingProperties.put(hostGroupName, missingHostGroupPropsMap);
+              }
+              Collection<String> missingHostGroupTypeProps = missingHostGroupPropsMap.get(category);
+              if (missingHostGroupTypeProps == null) {
+                missingHostGroupTypeProps = new HashSet<String>();
+                missingHostGroupPropsMap.put(category, missingHostGroupTypeProps);
+              }
+              missingHostGroupTypeProps.add(name);
+            }
+          }
+        }
+      }
+    }
+    return missingProperties;
+  }
+
+  private boolean propertyExists(ClusterTopology topology, Map<String, Map<String, String>> props, String type, String property) {
+    Map<String, String> typeProps = props.get(type);
+    return (typeProps != null && typeProps.containsKey(property)) || setDefaultPassword(topology, type, property);
+  }
+
+  /**
+   * Attempt to set the default password in cluster configuration for missing password property.
+   *
+   * @param configType       configuration type
+   * @param property         password property name
+   *
+   * @return true if password was set, otherwise false.  Currently the password will always be set
+   *         unless it is null
+   */
+  private boolean setDefaultPassword(ClusterTopology topology, String configType, String property) {
+    boolean setDefaultPassword = false;
+    if (defaultPassword != null && ! defaultPassword.trim().isEmpty()) {
+      topology.getConfiguration().setProperty(configType, property, defaultPassword);
+      setDefaultPassword = true;
+    }
+    return setDefaultPassword;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RequiredPasswordValidator that = (RequiredPasswordValidator) o;
+
+    return defaultPassword == null ? that.defaultPassword == null : defaultPassword.equals(that.defaultPassword);
+  }
+
+  @Override
+  public int hashCode() {
+    return defaultPassword != null ? defaultPassword.hashCode() : 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
new file mode 100644
index 0000000..3e1b565
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import com.google.inject.Singleton;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.ClusterRequest;
+import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ServiceComponentRequest;
+import org.apache.ambari.server.controller.ServiceRequest;
+import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
+import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
+import org.apache.ambari.server.controller.internal.Stack;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.state.host.HostImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages all cluster provisioning actions on the cluster topology.
+ */
+//todo: cluster isolation
+@Singleton
+public class TopologyManager {
+
+  private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
+  private final Map<String, LogicalRequest> reservedHosts = new HashMap<String, LogicalRequest>();
+  private final Map<Long, LogicalRequest> allRequests = new HashMap<Long, LogicalRequest>();
+  // priority is given to oldest outstanding requests
+  private final Collection<LogicalRequest> outstandingRequests = new ArrayList<LogicalRequest>();
+  private Map<String, ClusterTopology> clusterTopologyMap = new HashMap<String, ClusterTopology>();
+  private final Map<TopologyTask.Type, Set<TopologyTask>> pendingTasks = new HashMap<TopologyTask.Type, Set<TopologyTask>>();
+
+  //todo: proper wait/notify mechanism
+  private final Object configurationFlagLock = new Object();
+  private boolean configureComplete = false;
+
+  private AmbariManagementController controller;
+  ExecutorService executor;
+  //todo: task id's.  Use existing mechanism for getting next task id sequence
+  private final static AtomicLong nextTaskId = new AtomicLong(10000);
+  private final Object serviceResourceLock = new Object();
+
+
+  public TopologyManager() {
+    pendingTasks.put(TopologyTask.Type.CONFIGURE, new HashSet<TopologyTask>());
+    pendingTasks.put(TopologyTask.Type.INSTALL, new HashSet<TopologyTask>());
+    pendingTasks.put(TopologyTask.Type.START, new HashSet<TopologyTask>());
+
+    executor = getExecutorService();
+  }
+
+  public RequestStatusResponse provisionCluster(TopologyRequest request) throws InvalidTopologyException, AmbariException {
+    ClusterTopology topology = new ClusterTopologyImpl(request);
+
+    String clusterName = topology.getClusterName();
+    clusterTopologyMap.put(clusterName, topology);
+
+    createClusterResource(clusterName);
+    createServiceAndComponentResources(topology);
+
+    LogicalRequest logicalRequest = processRequest(request, topology);
+    try {
+      addClusterConfigRequest(new ClusterConfigurationRequest(topology));
+    } catch (AmbariException e) {
+      //todo
+      throw e;
+    }
+
+    //todo: this should be invoked as part of a generic lifecycle event which could possibly
+    //todo: be tied to cluster state
+    persistInstallStateForUI(clusterName);
+    return getRequestStatus(logicalRequest.getRequestId());
+  }
+
+  public RequestStatusResponse scaleHosts(TopologyRequest request)
+      throws InvalidTopologyException, AmbariException {
+
+    String clusterName = request.getClusterName();
+    ClusterTopology topology = clusterTopologyMap.get(clusterName);
+    if (topology == null) {
+      throw new AmbariException("TopologyManager: Unable to retrieve cluster topology for cluster: " + clusterName);
+    }
+
+    // this registers/updates all request host groups
+    topology.update(request);
+    return getRequestStatus(processRequest(request, topology).getRequestId());
+  }
+
+  //todo: should be synchronized on same lock as onHostRegistered()
+  //todo: HostImpl is what is registered with the HearbeatHandler and contains more host info than HostInfo so
+  //todo: we should probably change to use HostImpl
+  public void onHostRegistered(HostImpl host, boolean associatedWithCluster) {
+    if (associatedWithCluster) {
+      return;
+    }
+
+    boolean matchedToRequest = false;
+    String hostName = host.getHostName();
+    synchronized(reservedHosts) {
+      if (reservedHosts.containsKey(hostName)) {
+        LogicalRequest request = reservedHosts.remove(hostName);
+        HostOfferResponse response = request.offer(host);
+        if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
+          //todo: this is handled explicitly in LogicalRequest so this shouldn't happen here
+          throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName);
+        }
+        processAcceptedHostOffer(getClusterTopology(request.getClusterName()), response, host);
+        matchedToRequest = true;
+      }
+    }
+
+    // can be true if host was reserved
+    if (! matchedToRequest) {
+      synchronized (outstandingRequests) {
+        Iterator<LogicalRequest> outstandingRequestIterator = outstandingRequests.iterator();
+        while (! matchedToRequest && outstandingRequestIterator.hasNext()) {
+          LogicalRequest request = outstandingRequestIterator.next();
+          HostOfferResponse hostOfferResponse = request.offer(host);
+          switch (hostOfferResponse.getAnswer()) {
+            case ACCEPTED:
+              matchedToRequest = true;
+              processAcceptedHostOffer(getClusterTopology(request.getClusterName()), hostOfferResponse, host);
+              break;
+            case DECLINED_DONE:
+              outstandingRequestIterator.remove();
+              break;
+            case DECLINED_PREDICATE:
+              break;
+          }
+        }
+      }
+    }
+
+    if (! matchedToRequest) {
+      synchronized (availableHosts) {
+        System.out.printf("TopologyManager: Queueing available host %s\n", hostName);
+        availableHosts.add(host);
+      }
+    }
+  }
+
+  public void onHostLeft(String hostname) {
+    //todo:
+  }
+
+  public Request getRequest(long requestId) {
+    return allRequests.get(requestId);
+  }
+
+  public Collection<LogicalRequest> getRequests(Collection<Long> requestIds) {
+    if (requestIds.isEmpty()) {
+      return allRequests.values();
+    } else {
+      Collection<LogicalRequest> matchingRequests = new ArrayList<LogicalRequest>();
+      for (long id : requestIds) {
+        LogicalRequest request = allRequests.get(id);
+        if (request != null) {
+          matchingRequests.add(request);
+        }
+      }
+      return matchingRequests;
+    }
+  }
+
+  //todo: currently we are just returning all stages for all requests
+  //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each
+  public Collection<StageEntity> getStages() {
+    Collection<StageEntity> stages = new ArrayList<StageEntity>();
+    for (LogicalRequest logicalRequest : allRequests.values()) {
+      stages.addAll(logicalRequest.getStageEntities());
+    }
+    return stages;
+  }
+
+  public Collection<HostRoleCommand> getTasks(long requestId) {
+    LogicalRequest request = allRequests.get(requestId);
+    return request == null ? Collections.<HostRoleCommand>emptyList() : request.getCommands();
+  }
+
+  public Collection<HostRoleCommand> getTasks(Collection<Long> requestIds) {
+    Collection<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
+    for (long id : requestIds) {
+      tasks.addAll(getTasks(id));
+    }
+
+    return tasks;
+  }
+
+  public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries(Long requestId) {
+    LogicalRequest request = allRequests.get(requestId);
+    return request == null ? Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap() :
+        request.getStageSummaries();
+  }
+
+  public RequestStatusResponse getRequestStatus(long requestId) {
+    LogicalRequest request = allRequests.get(requestId);
+    return request == null ? null : request.getRequestStatus();
+  }
+
+  public Collection<RequestStatusResponse> getRequestStatus(Collection<Long> ids) {
+    List<RequestStatusResponse> requestStatusResponses = new ArrayList<RequestStatusResponse>();
+    for (long id : ids) {
+      RequestStatusResponse response = getRequestStatus(id);
+      if (response != null) {
+        requestStatusResponses.add(response);
+      }
+    }
+
+    return requestStatusResponses;
+  }
+
+  public ClusterTopology getClusterTopology(String clusterName) {
+    return clusterTopologyMap.get(clusterName);
+  }
+
+  public Map<String, Collection<String>> getProjectedTopology() {
+    Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
+
+    for (LogicalRequest logicalRequest : allRequests.values()) {
+      Map<String, Collection<String>> requestTopology = logicalRequest.getProjectedTopology();
+      for (Map.Entry<String, Collection<String>> entry : requestTopology.entrySet()) {
+        String host = entry.getKey();
+        Collection<String> hostComponents = hostComponentMap.get(host);
+        if (hostComponents == null) {
+          hostComponents = new HashSet<String>();
+          hostComponentMap.put(host, hostComponents);
+        }
+        hostComponents.addAll(entry.getValue());
+      }
+    }
+    return hostComponentMap;
+  }
+
+  private LogicalRequest processRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException {
+
+    finalizeTopology(request, topology);
+    LogicalRequest logicalRequest = createLogicalRequest(request, topology);
+
+    boolean requestHostComplete = false;
+    //todo: overall synchronization. Currently we have nested synchronization here
+    synchronized(availableHosts) {
+      Iterator<HostImpl> hostIterator = availableHosts.iterator();
+      while (! requestHostComplete && hostIterator.hasNext()) {
+        HostImpl host = hostIterator.next();
+        synchronized (reservedHosts) {
+          String hostname = host.getHostName();
+          if (reservedHosts.containsKey(hostname))  {
+            if (logicalRequest.equals(reservedHosts.get(hostname))) {
+              // host is registered to this request, remove it from reserved map
+              reservedHosts.remove(hostname);
+            } else {
+              // host is registered with another request, don't offer
+              //todo: clean up logic
+              continue;
+            }
+          }
+        }
+        HostOfferResponse response = logicalRequest.offer(host);
+        switch (response.getAnswer()) {
+          case ACCEPTED:
+            //todo: when host matches last host it returns ACCEPTED so we don't know that logical request is no
+            //todo: longer outstanding until we call offer again.  This is really only an issue if we need to
+            //todo: deal specifically with outstanding hosts other than calling offer.  Also, failure handling
+            //todo: may affect this behavior??
+            hostIterator.remove();
+            processAcceptedHostOffer(getClusterTopology(logicalRequest.getClusterName()), response, host);
+            break;
+          case DECLINED_DONE:
+            requestHostComplete = true;
+            break;
+          case DECLINED_PREDICATE:
+            break;
+        }
+      }
+
+      if (! requestHostComplete) {
+        // not all required hosts have been matched (see earlier comment regarding outstanding logical requests
+        outstandingRequests.add(logicalRequest);
+      }
+    }
+    return logicalRequest;
+  }
+
+  private LogicalRequest createLogicalRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException {
+    LogicalRequest logicalRequest = new LogicalRequest(request, new ClusterTopologyContext(topology));
+    allRequests.put(logicalRequest.getRequestId(), logicalRequest);
+    synchronized (reservedHosts) {
+      for (String host : logicalRequest.getReservedHosts()) {
+        reservedHosts.put(host, logicalRequest);
+      }
+    }
+
+    return logicalRequest;
+  }
+
+  private void processAcceptedHostOffer(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
+    try {
+      topology.addHostToTopology(response.getHostGroupName(), host.getHostName());
+    } catch (InvalidTopologyException e) {
+      //todo
+      throw new RuntimeException(e);
+    } catch (NoSuchHostGroupException e) {
+      throw new RuntimeException(e);
+    }
+
+    List<TopologyTask> tasks = response.getTasks();
+    synchronized (configurationFlagLock) {
+      if (configureComplete) {
+        for (TopologyTask task : tasks) {
+          task.run();
+        }
+      }else {
+        for (TopologyTask task : tasks) {
+          //todo: proper state dependencies
+          TopologyTask.Type taskType = task.getType();
+          if (taskType == TopologyTask.Type.RESOURCE_CREATION || taskType == TopologyTask.Type.CONFIGURE) {
+            task.run();
+          } else {
+            // all type collections are added at init time
+            pendingTasks.get(taskType).add(task);
+          }
+        }
+      }
+    }
+  }
+
+  //todo: this should invoke a callback on each 'service' in the topology
+  private void finalizeTopology(TopologyRequest request, ClusterTopology topology) {
+    addKerberosClientIfNecessary(topology);
+  }
+
+  /**
+   * Add the kerberos client to groups if kerberos is enabled for the cluster.
+   *
+   * @param topology  cluster topology
+   */
+  //for now, hard coded here
+  private void addKerberosClientIfNecessary(ClusterTopology topology) {
+
+    String clusterName = topology.getClusterName();
+    //todo: logic would ideally be contained in the stack
+    Cluster cluster;
+    try {
+      cluster = getController().getClusters().getCluster(clusterName);
+    } catch (AmbariException e) {
+      //todo: this shouldn't happen at this point but still need to handle in a generic manner for topo finalization
+      throw new RuntimeException("Parent Cluster resource doesn't exist.  clusterName= " + clusterName);
+    }
+    if (cluster.getSecurityType() == SecurityType.KERBEROS) {
+      for (HostGroup group : topology.getBlueprint().getHostGroups().values()) {
+        group.addComponent("KERBEROS_CLIENT");
+      }
+    }
+  }
+
+  // create a thread pool which is used for task execution
+  private synchronized ExecutorService getExecutorService() {
+    if (executor == null) {
+      LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+      int THREAD_POOL_CORE_SIZE = 2;
+      int THREAD_POOL_MAX_SIZE = 100;
+      int THREAD_POOL_TIMEOUT = Integer.MAX_VALUE;
+      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+          THREAD_POOL_CORE_SIZE,
+          THREAD_POOL_MAX_SIZE,
+          THREAD_POOL_TIMEOUT,
+          TimeUnit.SECONDS,
+          queue);
+
+      //threadPoolExecutor.allowCoreThreadTimeOut(true);
+      executor = threadPoolExecutor;
+    }
+    return executor;
+  }
+
+  private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) {
+    //pendingTasks.get(Action.CONFIGURE).add(new ConfigureClusterTask(configurationRequest));
+    synchronized (configurationFlagLock) {
+      configureComplete = false;
+    }
+    executor.submit(new ConfigureClusterTask(configurationRequest));
+  }
+
+  private void createClusterResource(String clusterName) throws AmbariException {
+    Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack();
+    String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion());
+    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, stackInfo, null);
+    getController().createCluster(clusterRequest);
+  }
+
+  private void createServiceAndComponentResources(ClusterTopology topology) {
+    String clusterName = topology.getClusterName();
+    Collection<String> services = topology.getBlueprint().getServices();
+
+    synchronized(serviceResourceLock) {
+      try {
+        Cluster cluster = getController().getClusters().getCluster(clusterName);
+        services.removeAll(cluster.getServices().keySet());
+      } catch (AmbariException e) {
+        //todo
+        throw new RuntimeException(e);
+      }
+      Set<ServiceRequest> serviceRequests = new HashSet<ServiceRequest>();
+      Set<ServiceComponentRequest> componentRequests = new HashSet<ServiceComponentRequest>();
+      for (String service : services) {
+        serviceRequests.add(new ServiceRequest(clusterName, service, null));
+        for (String component : topology.getBlueprint().getComponents(service)) {
+          componentRequests.add(new ServiceComponentRequest(clusterName, service, component, null));
+        }
+      }
+      try {
+        ServiceResourceProvider serviceResourceProvider = (ServiceResourceProvider) ClusterControllerHelper.
+            getClusterController().ensureResourceProvider(Resource.Type.Service);
+
+        serviceResourceProvider.createServices(serviceRequests);
+
+        ComponentResourceProvider componentResourceProvider = (ComponentResourceProvider) ClusterControllerHelper.
+            getClusterController().ensureResourceProvider(Resource.Type.Component);
+
+        componentResourceProvider.createComponents(componentRequests);
+      } catch (AmbariException e) {
+        //todo
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Persist cluster state for the ambari UI.  Setting this state informs that UI that a cluster has been
+   * installed and started and that the monitoring screen for the cluster should be displayed to the user.
+   *
+   * @param clusterName  name of cluster
+   */
+  //todo: invoke as part of a generic callback possible associated with cluster state
+  private void persistInstallStateForUI(String clusterName) throws AmbariException {
+    Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack();
+    String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion());
+    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
+
+    getController().updateClusters(Collections.singleton(clusterRequest), null);
+  }
+
+  private synchronized AmbariManagementController getController() {
+    if (controller == null) {
+      controller = AmbariServer.getController();
+    }
+    return controller;
+  }
+
+  private class ConfigureClusterTask implements Runnable {
+    private ClusterConfigurationRequest configRequest;
+
+
+    public ConfigureClusterTask(ClusterConfigurationRequest configRequest) {
+      this.configRequest = configRequest;
+    }
+
+
+    @Override
+    public void run() {
+      System.out.println("TopologyManager.ConfigureClusterTask: Entering");
+
+      boolean completed = false;
+      boolean interrupted = false;
+
+      while (! completed && ! interrupted) {
+        completed = areConfigsResolved();
+
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          interrupted = true;
+          // reset interrupted flag on thread
+          Thread.interrupted();
+
+        }
+      }
+
+      if (! interrupted) {
+        try {
+          System.out.println("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster");
+          // sets updated configuration on topology and cluster
+          configRequest.process();
+        } catch (Exception e) {
+          //todo: how to handle this?  If this fails, we shouldn't start any hosts.
+          System.out.println("TopologyManager.ConfigureClusterTask: " +
+              "An exception occurred while attempting to process cluster configs and set on cluster");
+          e.printStackTrace();
+        }
+
+        synchronized (configurationFlagLock) {
+          System.out.println("TopologyManager.ConfigureClusterTask: Setting configure complete flag to true");
+          configureComplete = true;
+        }
+
+        // execute all queued install/start tasks
+        executor.submit(new ExecuteQueuedHostTasks());
+      }
+      System.out.println("TopologyManager.ConfigureClusterTask: Exiting");
+    }
+
+    // get set of required host groups from config processor and confirm that all requests
+    // have fully resolved the host names for the required host groups
+    private boolean areConfigsResolved() {
+      boolean configTopologyResolved = true;
+      Collection<String> requiredHostGroups;
+      try {
+        requiredHostGroups = configRequest.getRequiredHostGroups();
+      } catch (RuntimeException e) {
+        //todo
+        System.out.println("Caught an error from Config Processor: " + e);
+        throw e;
+      }
+
+      synchronized (outstandingRequests) {
+        for (LogicalRequest outstandingRequest : outstandingRequests) {
+          if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) {
+            configTopologyResolved = false;
+            break;
+          }
+        }
+      }
+      return configTopologyResolved;
+    }
+  }
+
+  private class ExecuteQueuedHostTasks implements Runnable {
+    @Override
+    public void run() {
+      //todo: lock is too coarse grained, should only be on start tasks
+      synchronized(pendingTasks) {
+        // execute queued install tasks
+        //todo: once agent configuration is removed from agent install, we will be able to
+        //todo: install without regard to configuration resolution
+        Iterator<TopologyTask> iter = pendingTasks.get(TopologyTask.Type.INSTALL).iterator();
+        while (iter.hasNext()) {
+          iter.next().run();
+          iter.remove();
+        }
+
+        iter = pendingTasks.get(TopologyTask.Type.START).iterator();
+        while (iter.hasNext()) {
+          iter.next().run();
+          iter.remove();
+        }
+      }
+    }
+  }
+
+  //todo: this is a temporary step, remove after refactoring makes it no longer needed
+  public class ClusterTopologyContext {
+    private ClusterTopology clusterTopology;
+
+    public ClusterTopologyContext(ClusterTopology clusterTopology) {
+      this.clusterTopology = clusterTopology;
+    }
+
+    public ClusterTopology getClusterTopology() {
+      return clusterTopology;
+    }
+
+    public long getNextTaskId() {
+      synchronized (nextTaskId) {
+        return nextTaskId.getAndIncrement();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
new file mode 100644
index 0000000..4c1abf9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A request which is used to create or modify a cluster topology.
+ */
+//todo: naming
+public interface TopologyRequest {
+
+  public String getClusterName();
+  //todo: only a single BP may be specified so all host groups have the same bp.
+  //todo: There is no reason really that we couldn't allow hostgroups from different blueprints assuming that
+  //todo: the stack matches across the groups.  For scaling operations, we allow different blueprints (rather arbitrary)
+  //todo: so BP really needs to get associated with the HostGroupInfo, even for create which will have a single BP
+  //todo: for all HG's.
+  public Blueprint getBlueprint();
+  public Configuration getConfiguration();
+  public Map<String, HostGroupInfo> getHostGroupInfo();
+  public List<TopologyValidator> getTopologyValidators();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
new file mode 100644
index 0000000..284fbba
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.Map;
+
+/**
+ * Factory for creating topology requests.
+ */
+public interface TopologyRequestFactory {
+  public TopologyRequest createProvisionClusterRequest(Map<String, Object> properties) throws InvalidTopologyTemplateException;
+  // todo: use to create other request types
+}