You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by js...@apache.org on 2015/05/07 07:19:47 UTC

[3/4] ambari git commit: AMBARI-10990. Implement topology manager persistence

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
index 9f9db5c..9e25dfb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -18,144 +18,122 @@
 
 package org.apache.ambari.server.topology;
 
-import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
-import org.apache.ambari.server.RoleCommand;
-import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
-import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
-import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
-import org.apache.ambari.server.controller.AmbariManagementController;
-import org.apache.ambari.server.controller.ConfigGroupRequest;
+import org.apache.ambari.server.api.predicate.InvalidQueryException;
+import org.apache.ambari.server.api.predicate.PredicateCompiler;
 import org.apache.ambari.server.controller.RequestStatusResponse;
-import org.apache.ambari.server.controller.ServiceComponentHostRequest;
 import org.apache.ambari.server.controller.ShortTaskStatus;
-import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider;
-import org.apache.ambari.server.controller.internal.HostComponentResourceProvider;
 import org.apache.ambari.server.controller.internal.HostResourceProvider;
-import org.apache.ambari.server.controller.internal.RequestImpl;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.Stack;
-import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.Resource;
-import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
-import org.apache.ambari.server.controller.spi.SystemException;
-import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
-import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Config;
-import org.apache.ambari.server.state.ConfigHelper;
-import org.apache.ambari.server.state.ConfigImpl;
-import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
 import org.apache.ambari.server.state.host.HostImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import static org.apache.ambari.server.controller.AmbariServer.getController;
 
 /**
  * Represents a set of requests to a single host such as install, start, etc.
  */
 public class HostRequest implements Comparable<HostRequest> {
 
+  private final static Logger LOG = LoggerFactory.getLogger(HostRequest.class);
+
   private long requestId;
   private String blueprint;
   private HostGroup hostGroup;
   private String hostgroupName;
   private Predicate predicate;
-  private int cardinality = -1;
   private String hostname = null;
   private String cluster;
   private boolean containsMaster;
-  private long stageId = -1;
-  //todo: should be able to use the presence of hostName for this
-  private boolean outstanding = true;
+  private final long id;
+  private boolean isOutstanding = true;
 
-  //todo: remove
-  private Map<String, Long> logicalInstallTaskIds = new HashMap<String, Long>();
-  //todo: remove
-  private Map<String, Long> logicalStartTaskIds = new HashMap<String, Long>();
+  private Map<TopologyTask, Map<String, Long>> logicalTaskMap = new HashMap<TopologyTask, Map<String, Long>>();
 
-  Collection<HostRoleCommand> logicalTasks = new ArrayList<HostRoleCommand>();
+  Map<Long, HostRoleCommand> logicalTasks = new HashMap<Long, HostRoleCommand>();
 
   // logical task id -> physical tasks
-  private Map<Long, Collection<Long>> physicalTasks = new HashMap<Long, Collection<Long>>();
-
-  private static HostResourceProvider hostResourceProvider;
-
-  private HostComponentResourceProvider hostComponentResourceProvider;
+  private Map<Long, Long> physicalTasks = new HashMap<Long, Long>();
 
-  private AmbariManagementController controller = getController();
-  private ActionManager actionManager = controller.getActionManager();
-  private ConfigHelper configHelper = controller.getConfigHelper();
-  private AmbariMetaInfo metaInfoManager = controller.getAmbariMetaInfo();
+  private List<TopologyTask> topologyTasks = new ArrayList<TopologyTask>();
 
-  //todo: temporary refactoring step
-  private TopologyManager.ClusterTopologyContext topologyContext;
+  private ClusterTopology topology;
 
-  private static HostRoleCommandFactory hostRoleCommandFactory;
+  private static PredicateCompiler predicateCompiler = new PredicateCompiler();
 
-  public static void init(HostRoleCommandFactory factory) {
-    hostRoleCommandFactory = factory;
-  }
-
-  public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
-                     int cardinality, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+  public HostRequest(long requestId, long id, String cluster, String hostname, String blueprintName,
+                     HostGroup hostGroup, Predicate predicate, ClusterTopology topology) {
     this.requestId = requestId;
-    this.stageId = stageId;
+    this.id = id;
     this.cluster = cluster;
     this.blueprint = blueprintName;
     this.hostGroup = hostGroup;
     this.hostgroupName = hostGroup.getName();
-    this.cardinality = cardinality;
     this.predicate = predicate;
     this.containsMaster = hostGroup.containsMasterComponent();
-    this.topologyContext = topologyContext;
+    this.topology = topology;
 
     createTasks();
-    System.out.println("HostRequest: Created request: Host Association Pending");
+    System.out.println("HostRequest: Created request for host: " +
+        (hostname == null ? "Host Assignment Pending" : hostname));
   }
 
-  public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup,
-                     String hostname, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) {
+  /**
+   * Only to be used when replaying persisted requests upon server startup.
+   *
+   * @param requestId    logical request id
+   * @param id           host request id
+   * @param predicate    host predicate
+   * @param topology     cluster topology
+   * @param entity       host request entity
+   */
+  public HostRequest(long requestId, long id, String predicate,
+                     ClusterTopology topology, TopologyHostRequestEntity entity) {
+
     this.requestId = requestId;
-    this.stageId = stageId;
-    this.cluster = cluster;
-    this.blueprint = blueprintName;
-    this.hostGroup = hostGroup;
-    this.hostgroupName = hostGroup.getName();
-    this.hostname = hostname;
-    this.predicate = predicate;
+    this.id = id;
+    this.cluster = topology.getClusterName();
+    this.blueprint = topology.getBlueprint().getName();
+    this.hostgroupName = entity.getTopologyHostGroupEntity().getName();
+    this.hostGroup = topology.getBlueprint().getHostGroup(hostgroupName);
+    this.hostname = entity.getHostName();
+    this.predicate = toPredicate(predicate);
     this.containsMaster = hostGroup.containsMasterComponent();
-    this.topologyContext = topologyContext;
+    this.topology = topology;
 
-    createTasks();
-    System.out.println("HostRequest: Created request for host: " + hostname);
+    createTasksForReplay(entity);
+
+    //todo: we may be able to simplify by just checking hostname
+    isOutstanding = hostname == null || !topology.getAmbariContext().
+        isHostRegisteredWithCluster(cluster, hostname);
+
+    System.out.println("HostRequest: Successfully recovered host request for host: " +
+        (hostname == null ? "Host Assignment Pending" : hostname));
   }
 
   //todo: synchronization
   public synchronized HostOfferResponse offer(HostImpl host) {
-    if (! outstanding) {
+    if (!isOutstanding) {
       return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
     }
     if (matchesHost(host)) {
-      outstanding = false;
+      isOutstanding = false;
       hostname = host.getHostName();
-      List<TopologyTask> tasks = provision(host);
-
-      return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, hostGroup.getName(), tasks);
+      setHostOnTasks(host);
+      return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, id, hostGroup.getName(), topologyTasks);
     } else {
       return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE);
     }
@@ -184,32 +162,28 @@ public class HostRequest implements Comparable<HostRequest> {
     return hostgroupName;
   }
 
-  public int getCardinality() {
-    return cardinality;
-  }
-
   public Predicate getPredicate() {
     return predicate;
   }
 
+  public boolean isCompleted() {
+    return ! isOutstanding;
+  }
 
-  private List<TopologyTask> provision(HostImpl host) {
-    List<TopologyTask> tasks = new ArrayList<TopologyTask>();
-
-    tasks.add(new CreateHostResourcesTask(topologyContext.getClusterTopology(), host, getHostgroupName()));
-    setHostOnTasks(host);
-
-    HostGroup hostGroup = getHostGroup();
-    tasks.add(new ConfigureConfigGroup(getConfigurationGroupName(hostGroup.getBlueprintName(),
-        hostGroup.getName()), getClusterName(), hostname));
+  private void createTasks() {
+    // high level topology tasks such as INSTALL, START, ...
+    topologyTasks.add(new PersistHostResourcesTask());
+    topologyTasks.add(new RegisterWithConfigGroupTask());
 
-    tasks.add(getInstallTask());
-    tasks.add(getStartTask());
+    InstallHostTask installTask = new InstallHostTask();
+    topologyTasks.add(installTask);
+    StartHostTask startTask = new StartHostTask();
+    topologyTasks.add(startTask);
 
-    return tasks;
-  }
+    logicalTaskMap.put(installTask, new HashMap<String, Long>());
+    logicalTaskMap.put(startTask, new HashMap<String, Long>());
 
-  private void createTasks() {
+    // lower level logical component level tasks which get mapped to physical tasks
     HostGroup hostGroup = getHostGroup();
     for (String component : hostGroup.getComponents()) {
       if (component == null || component.equals("AMBARI_SERVER")) {
@@ -221,79 +195,90 @@ public class HostRequest implements Comparable<HostRequest> {
           getHostName() :
           "PENDING HOST ASSIGNMENT : HOSTGROUP=" + getHostgroupName();
 
-      HostRoleCommand installTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.INSTALL);
-      installTask.setStatus(HostRoleStatus.PENDING);
-      installTask.setTaskId(topologyContext.getNextTaskId());
-      installTask.setRequestId(getRequestId());
-      installTask.setStageId(stageId);
+      AmbariContext context = topology.getAmbariContext();
+      HostRoleCommand logicalInstallTask = context.createAmbariTask(
+          getRequestId(), id, component, hostName, AmbariContext.TaskType.INSTALL);
+      logicalTasks.put(logicalInstallTask.getTaskId(), logicalInstallTask);
+      logicalTaskMap.get(installTask).put(component, logicalInstallTask.getTaskId());
 
-      //todo: had to add requestId to ShortTaskStatus
-      //todo: revert addition of requestId when we are using LogicalTask
-      installTask.setRequestId(getRequestId());
+      Stack stack = hostGroup.getStack();
+      // if component isn't a client, add a start task
+      if (! stack.getComponentInfo(component).isClient()) {
+        HostRoleCommand logicalStartTask = context.createAmbariTask(
+            getRequestId(), id, component, hostName, AmbariContext.TaskType.START);
+        logicalTasks.put(logicalStartTask.getTaskId(), logicalStartTask);
+        logicalTaskMap.get(startTask).put(component, logicalStartTask.getTaskId());
+      }
+    }
+  }
 
-      logicalTasks.add(installTask);
-      registerLogicalInstallTaskId(component, installTask.getTaskId());
+  private void createTasksForReplay(TopologyHostRequestEntity entity) {
+    topologyTasks.add(new PersistHostResourcesTask());
+    topologyTasks.add(new RegisterWithConfigGroupTask());
+    InstallHostTask installTask = new InstallHostTask();
+    topologyTasks.add(installTask);
+    StartHostTask startTask = new StartHostTask();
+    topologyTasks.add(startTask);
+
+    logicalTaskMap.put(installTask, new HashMap<String, Long>());
+    logicalTaskMap.put(startTask, new HashMap<String, Long>());
+
+    AmbariContext ambariContext = topology.getAmbariContext();
+    // lower level logical component level tasks which get mapped to physical tasks
+    for (TopologyHostTaskEntity topologyTaskEntity : entity.getTopologyHostTaskEntities()) {
+      TopologyTask.Type taskType = TopologyTask.Type.valueOf(topologyTaskEntity.getType());
+      for (TopologyLogicalTaskEntity logicalTaskEntity : topologyTaskEntity.getTopologyLogicalTaskEntities()) {
+        Long logicalTaskId = logicalTaskEntity.getId();
+        String component = logicalTaskEntity.getComponentName();
+
+        AmbariContext.TaskType logicalTaskType = getLogicalTaskType(taskType);
+        HostRoleCommand task = ambariContext.createAmbariTask(logicalTaskId, getRequestId(), id,
+            component, entity.getHostName(), logicalTaskType);
+
+        logicalTasks.put(logicalTaskId, task);
+        Long physicalTaskId = logicalTaskEntity.getPhysicalTaskId();
+        if (physicalTaskId != null) {
+          registerPhysicalTaskId(logicalTaskId, physicalTaskId);
+        }
 
-      Stack stack = hostGroup.getStack();
-      try {
-        // if component isn't a client, add a start task
-        if (! metaInfoManager.getComponent(stack.getName(), stack.getVersion(), stack.getServiceForComponent(component), component).isClient()) {
-          HostRoleCommand startTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.START);
-          startTask.setStatus(HostRoleStatus.PENDING);
-          startTask.setRequestId(getRequestId());
-          startTask.setTaskId(topologyContext.getNextTaskId());
-          startTask.setRequestId(getRequestId());
-          startTask.setStageId(stageId);
-          logicalTasks.add(startTask);
-          registerLogicalStartTaskId(component, startTask.getTaskId());
+        //assumes only one task per type
+        for (TopologyTask topologyTask : topologyTasks) {
+          if (taskType == topologyTask.getType()) {
+            logicalTaskMap.get(topologyTask).put(component, logicalTaskId);
+          }
         }
-      } catch (AmbariException e) {
-        e.printStackTrace();
-        //todo: how to handle
-        throw new RuntimeException(e);
       }
     }
   }
 
-  /**
-   * Get a config group name based on a bp and host group.
-   *
-   * @param bpName         blueprint name
-   * @param hostGroupName  host group name
-   * @return  config group name
-   */
-  protected String getConfigurationGroupName(String bpName, String hostGroupName) {
-    return String.format("%s:%s", bpName, hostGroupName);
+  private static AmbariContext.TaskType getLogicalTaskType(TopologyTask.Type topologyTaskType) {
+    return topologyTaskType ==
+        TopologyTask.Type.INSTALL ?
+        AmbariContext.TaskType.INSTALL :
+        AmbariContext.TaskType.START;
   }
 
   private void setHostOnTasks(HostImpl host) {
-    for (HostRoleCommand task : getTasks()) {
+    for (HostRoleCommand task : getLogicalTasks()) {
       task.setHostEntity(host.getHostEntity());
     }
   }
 
-  //todo: analyze all all configuration needs for dealing with deprecated properties
-  /**
-   * Since global configs are deprecated since 1.7.0, but still supported.
-   * We should automatically map any globals used, to *-env dictionaries.
-   *
-   * @param blueprintConfigurations  map of blueprint configurations keyed by type
-   */
-  private void handleGlobalsBackwardsCompability(Stack stack,
-                                                 Map<String, Map<String, String>> blueprintConfigurations) {
-
-    StackId stackId = new StackId(stack.getName(), stack.getVersion());
-    configHelper.moveDeprecatedGlobals(stackId, blueprintConfigurations, getClusterName());
+  public List<TopologyTask> getTopologyTasks() {
+    return topologyTasks;
   }
 
-  public Collection<HostRoleCommand> getTasks() {
+  public Collection<HostRoleCommand> getLogicalTasks() {
     // sync logical task state with physical tasks
-    for (HostRoleCommand logicalTask : logicalTasks) {
-      Collection<Long> physicalTaskIds = physicalTasks.get(logicalTask.getTaskId());
-      if (physicalTaskIds != null) {
-        //todo: for now only one physical task per logical task
-        long physicalTaskId = physicalTaskIds.iterator().next();
-        HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+    for (HostRoleCommand logicalTask : logicalTasks.values()) {
+      // set host on command detail if it is set to null
+      String commandDetail = logicalTask.getCommandDetail();
+      if (commandDetail != null && commandDetail.contains("null")) {
+        logicalTask.setCommandDetail(commandDetail.replace("null", hostname));
+      }
+      Long physicalTaskId = physicalTasks.get(logicalTask.getTaskId());
+      if (physicalTaskId != null) {
+        HostRoleCommand physicalTask = topology.getAmbariContext().getPhysicalTask(physicalTaskId);
         if (physicalTask != null) {
           logicalTask.setStatus(physicalTask.getStatus());
           logicalTask.setCommandDetail(physicalTask.getCommandDetail());
@@ -313,12 +298,20 @@ public class HostRequest implements Comparable<HostRequest> {
         }
       }
     }
-    return logicalTasks;
+    return logicalTasks.values();
+  }
+
+  public Map<String, Long> getLogicalTasksForTopologyTask(TopologyTask topologyTask) {
+    return new HashMap<String, Long>(logicalTaskMap.get(topologyTask));
+  }
+
+  public HostRoleCommand getLogicalTask(long logicalTaskId) {
+    return logicalTasks.get(logicalTaskId);
   }
 
   public Collection<HostRoleCommandEntity> getTaskEntities() {
     Collection<HostRoleCommandEntity> taskEntities = new ArrayList<HostRoleCommandEntity>();
-    for (HostRoleCommand task : logicalTasks) {
+    for (HostRoleCommand task : logicalTasks.values()) {
       HostRoleCommandEntity entity = task.constructNewPersistenceEntity();
       // the above method doesn't set all of the fields for some unknown reason
       entity.setRequestId(task.getRequestId());
@@ -328,11 +321,9 @@ public class HostRequest implements Comparable<HostRequest> {
       entity.setErrorLog(task.errorLog);
 
       // set state from physical task
-      Collection<Long> physicalTaskIds = physicalTasks.get(task.getTaskId());
-      if (physicalTaskIds != null) {
-        //todo: for now only one physical task per logical task
-        long physicalTaskId = physicalTaskIds.iterator().next();
-        HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId);
+      Long physicalTaskId = physicalTasks.get(task.getTaskId());
+      if (physicalTaskId != null) {
+        HostRoleCommand physicalTask = topology.getAmbariContext().getPhysicalTask(physicalTaskId);
         if (physicalTask != null) {
           entity.setStatus(physicalTask.getStatus());
           entity.setCommandDetail(physicalTask.getCommandDetail());
@@ -361,41 +352,26 @@ public class HostRequest implements Comparable<HostRequest> {
   }
 
   public boolean matchesHost(HostImpl host) {
-    if (hostname != null) {
-      return host.getHostName().equals(hostname);
-    } else if (predicate != null) {
-      return predicate.evaluate(new HostResourceAdapter(host));
-    } else {
-      return true;
-    }
+    return (hostname != null) ?
+        host.getHostName().equals(hostname) :
+        predicate == null || predicate.evaluate(new HostResourceAdapter(host));
   }
 
   public String getHostName() {
     return hostname;
   }
 
-  public long getStageId() {
-    return stageId;
-  }
-
-  //todo: remove
-  private void registerLogicalInstallTaskId(String component, long taskId) {
-    logicalInstallTaskIds.put(component, taskId);
+  public long getId() {
+    return id;
   }
 
-  //todo: remove
-  private void registerLogicalStartTaskId(String component, long taskId) {
-    logicalStartTaskIds.put(component, taskId);
-  }
-
-  //todo: remove
-  private long getLogicalInstallTaskId(String component) {
-    return logicalInstallTaskIds.get(component);
+  public long getStageId() {
+    // stage id is same as host request id
+    return getId();
   }
 
-  //todo: remove
-  private long getLogicalStartTaskId(String component) {
-    return logicalStartTaskIds.get(component);
+  public Long getPhysicalTaskId(long logicalTaskId) {
+    return physicalTasks.get(logicalTaskId);
   }
 
   //todo: since this is used to determine equality, using hashCode() isn't safe as it can return the same
@@ -411,333 +387,144 @@ public class HostRequest implements Comparable<HostRequest> {
 
   //todo: once we have logical tasks, move tracking of physical tasks there
   public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) {
-    Collection<Long> physicalTasksForId = physicalTasks.get(logicalTaskId);
-    if (physicalTasksForId == null) {
-      physicalTasksForId = new HashSet<Long>();
-      physicalTasks.put(logicalTaskId, physicalTasksForId);
-    }
-    physicalTasksForId.add(physicalTaskId);
-  }
-
-  //todo: temporary step
-  public TopologyTask getInstallTask() {
-    return new InstallHostTask();
-  }
+    physicalTasks.put(logicalTaskId, physicalTaskId);
 
-  //todo: temporary step
-  public TopologyTask getStartTask() {
-    return new StartHostTask();
+    topology.getAmbariContext().getPersistedTopologyState().
+        registerPhysicalTask(logicalTaskId, physicalTaskId);
   }
 
-  //todo: temporary refactoring step
-  public HostGroupInfo createHostGroupInfo(HostGroup group) {
-    HostGroupInfo info = new HostGroupInfo(group.getName());
-    info.setConfiguration(group.getConfiguration());
-
-    return info;
-  }
-
-  private synchronized HostResourceProvider getHostResourceProvider() {
-    if (hostResourceProvider == null) {
-      hostResourceProvider = (HostResourceProvider)
-          ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.Host);
-
-    }
-    return hostResourceProvider;
-  }
-
-  private synchronized HostComponentResourceProvider getHostComponentResourceProvider() {
-    if (hostComponentResourceProvider == null) {
-      hostComponentResourceProvider = (HostComponentResourceProvider)
-          ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.HostComponent);
+  private Predicate toPredicate(String predicate) {
+    Predicate compiledPredicate = null;
+    try {
+      if (predicate != null && ! predicate.isEmpty()) {
+        compiledPredicate = predicateCompiler.compile(predicate);
+      }
+    } catch (InvalidQueryException e) {
+      // log error and proceed without predicate
+      LOG.error("Unable to compile predicate for host request: " + e, e);
     }
-    return hostComponentResourceProvider;
+    return compiledPredicate;
   }
 
-  //todo: extract
-  private class InstallHostTask implements TopologyTask {
-    //todo: use future to obtain returned Response which contains the request id
-    //todo: error handling
-    //todo: monitor status of requests
+  private class PersistHostResourcesTask implements TopologyTask {
+    private AmbariContext ambariContext;
 
     @Override
     public Type getType() {
-      return Type.INSTALL;
+      return Type.RESOURCE_CREATION;
+    }
+
+    @Override
+    public void init(ClusterTopology topology, AmbariContext ambariContext) {
+      this.ambariContext = ambariContext;
     }
 
     @Override
     public void run() {
-      try {
-        System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname);
-        RequestStatusResponse response = getHostResourceProvider().install(getHostName(), cluster);
-        // map logical install tasks to physical install tasks
-        List<ShortTaskStatus> underlyingTasks = response.getTasks();
-        for (ShortTaskStatus task : underlyingTasks) {
-          Long logicalInstallTaskId = getLogicalInstallTaskId(task.getRole());
-          //todo: for now only one physical task per component
-          long taskId = task.getTaskId();
-          //physicalTasks.put(logicalInstallTaskId, Collections.singleton(taskId));
-          registerPhysicalTaskId(logicalInstallTaskId, taskId);
-
-          //todo: move this to provision
-          //todo: shouldn't have to iterate over all tasks to find install task
-          //todo: we are doing the same thing in the above registerPhysicalTaskId() call
-          // set attempt count on task
-          for (HostRoleCommand logicalTask : logicalTasks) {
-            if (logicalTask.getTaskId() == logicalInstallTaskId) {
-              logicalTask.incrementAttemptCount();
-            }
-          }
-        }
-      } catch (ResourceAlreadyExistsException e) {
-        e.printStackTrace();
-      } catch (SystemException e) {
-        e.printStackTrace();
-      } catch (NoSuchParentResourceException e) {
-        e.printStackTrace();
-      } catch (UnsupportedPropertyException e) {
-        e.printStackTrace();
-      } catch (Exception e) {
-        e.printStackTrace();
+      HostGroup group = topology.getBlueprint().getHostGroup(getHostgroupName());
+      Map<String, Collection<String>> serviceComponents = new HashMap<String, Collection<String>>();
+      for (String service : group.getServices()) {
+        serviceComponents.put(service, new HashSet<String> (group.getComponents(service)));
       }
+      ambariContext.createAmbariHostResources(getClusterName(), getHostName(), serviceComponents);
     }
   }
 
-  //todo: extract
-  private class StartHostTask implements TopologyTask {
-    //todo: use future to obtain returned Response which contains the request id
-    //todo: error handling
-    //todo: monitor status of requests
+  private class RegisterWithConfigGroupTask implements TopologyTask {
+    private ClusterTopology clusterTopology;
+    private AmbariContext ambariContext;
 
     @Override
     public Type getType() {
-      return Type.START;
+      return Type.CONFIGURE;
+    }
+
+    @Override
+    public void init(ClusterTopology topology, AmbariContext ambariContext) {
+      this.clusterTopology = topology;
+      this.ambariContext = ambariContext;
     }
 
     @Override
     public void run() {
-      try {
-        System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname);
-        RequestStatusResponse response = getHostComponentResourceProvider().start(cluster, hostname);
-        // map logical install tasks to physical install tasks
-        List<ShortTaskStatus> underlyingTasks = response.getTasks();
-        for (ShortTaskStatus task : underlyingTasks) {
-          String component = task.getRole();
-          Long logicalStartTaskId = getLogicalStartTaskId(component);
-          // for now just set on outer map
-          registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
-
-          //todo: move this to provision
-          // set attempt count on task
-          for (HostRoleCommand logicalTask : logicalTasks) {
-            if (logicalTask.getTaskId() == logicalStartTaskId) {
-              logicalTask.incrementAttemptCount();
-            }
-          }
-        }
-      } catch (SystemException e) {
-        e.printStackTrace();
-      } catch (UnsupportedPropertyException e) {
-        e.printStackTrace();
-      } catch (NoSuchParentResourceException e) {
-        e.printStackTrace();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
+      ambariContext.registerHostWithConfigGroup(getHostName(), clusterTopology, getHostgroupName());
     }
   }
 
-  private class CreateHostResourcesTask implements TopologyTask {
-    private ClusterTopology topology;
-    private HostImpl host;
-    private String groupName;
-
-    public CreateHostResourcesTask(ClusterTopology topology, HostImpl host, String groupName) {
-      this.topology = topology;
-      this.host = host;
-      this.groupName = groupName;
-    }
+  //todo: extract
+  private class InstallHostTask implements TopologyTask {
+    private ClusterTopology clusterTopology;
 
     @Override
     public Type getType() {
-      return Type.RESOURCE_CREATION;
+      return Type.INSTALL;
     }
 
     @Override
-    public void run() {
-      try {
-        createHostResources();
-      } catch (AmbariException e) {
-        //todo: report error to caller
-        e.printStackTrace();
-        System.out.println("An error occurred when creating host resources: " + e.toString());
-      }
+    public void init(ClusterTopology topology, AmbariContext ambariContext) {
+      this.clusterTopology = topology;
     }
 
-    private void createHostResources() throws AmbariException {
-      Map<String, Object> properties = new HashMap<String, Object>();
-      properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, getClusterName());
-      properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, host.getHostName());
-      properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo());
-
-      getHostResourceProvider().createHosts(new RequestImpl(null, Collections.singleton(properties), null, null));
-      createHostComponentResources();
-    }
-
-    private void createHostComponentResources() throws AmbariException {
-      Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
-      Stack stack = topology.getBlueprint().getStack();
-      for (String component : topology.getBlueprint().getHostGroup(groupName).getComponents()) {
-        //todo: handle this in a generic manner.  These checks are all over the code
-        if (! component.equals("AMBARI_SERVER")) {
-          requests.add(new ServiceComponentHostRequest(topology.getClusterName(),
-              stack.getServiceForComponent(component), component, host.getHostName(), null));
+    @Override
+    public void run() {
+      System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname);
+      RequestStatusResponse response = clusterTopology.installHost(hostname);
+      // map logical install tasks to physical install tasks
+      List<ShortTaskStatus> underlyingTasks = response.getTasks();
+      for (ShortTaskStatus task : underlyingTasks) {
+        Long logicalInstallTaskId = logicalTaskMap.get(this).get(task.getRole());
+        //todo: for now only one physical task per component
+        long taskId = task.getTaskId();
+        registerPhysicalTaskId(logicalInstallTaskId, taskId);
+
+        //todo: move this to provision
+        //todo: shouldn't have to iterate over all tasks to find install task
+        //todo: we are doing the same thing in the above registerPhysicalTaskId() call
+        // set attempt count on task
+        for (HostRoleCommand logicalTask : logicalTasks.values()) {
+          if (logicalTask.getTaskId() == logicalInstallTaskId) {
+            logicalTask.incrementAttemptCount();
+          }
         }
       }
-
-      controller.createHostComponents(requests);
     }
   }
 
   //todo: extract
-  private class ConfigureConfigGroup implements TopologyTask {
-    private String groupName;
-    private String clusterName;
-    private String hostName;
-
-    public ConfigureConfigGroup(String groupName, String clusterName, String hostName) {
-      this.groupName = groupName;
-      this.clusterName = clusterName;
-      this.hostName = hostName;
-    }
+  private class StartHostTask implements TopologyTask {
+    private ClusterTopology clusterTopology;
 
     @Override
     public Type getType() {
-      return Type.CONFIGURE;
+      return Type.START;
     }
 
     @Override
-    public void run() {
-      try {
-        //todo: add task to offer response
-        if (! addHostToExistingConfigGroups()) {
-          createConfigGroupsAndRegisterHost();
-        }
-      } catch (Exception e) {
-        //todo: handle exceptions
-        e.printStackTrace();
-        throw new RuntimeException("Unable to register config group for host: " + hostname);
-      }
+    public void init(ClusterTopology topology, AmbariContext ambariContext) {
+      this.clusterTopology = topology;
     }
 
-    /**
-     * Add the new host to an existing config group.
-     *
-     * @throws SystemException                an unknown exception occurred
-     * @throws UnsupportedPropertyException   an unsupported property was specified in the request
-     * @throws NoSuchParentResourceException  a parent resource doesn't exist
-     */
-    private boolean addHostToExistingConfigGroups()
-        throws SystemException,
-        UnsupportedPropertyException,
-        NoSuchParentResourceException {
-
-      boolean addedHost = false;
-
-      Clusters clusters;
-      Cluster cluster;
-      try {
-        clusters = controller.getClusters();
-        cluster = clusters.getCluster(clusterName);
-      } catch (AmbariException e) {
-        throw new IllegalArgumentException(
-            String.format("Attempt to add hosts to a non-existent cluster: '%s'", clusterName));
-      }
-      // I don't know of a method to get config group by name
-      //todo: add a method to get config group by name
-      Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups();
-      for (ConfigGroup group : configGroups.values()) {
-        if (group.getName().equals(groupName)) {
-          try {
-            group.addHost(clusters.getHost(hostName));
-            group.persist();
-            addedHost = true;
-          } catch (AmbariException e) {
-            // shouldn't occur, this host was just added to the cluster
-            throw new SystemException(String.format(
-                "Unable to obtain newly created host '%s' from cluster '%s'", hostName, clusterName));
+    @Override
+    public void run() {
+      System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname);
+      RequestStatusResponse response = clusterTopology.startHost(hostname);
+      // map logical install tasks to physical install tasks
+      List<ShortTaskStatus> underlyingTasks = response.getTasks();
+      for (ShortTaskStatus task : underlyingTasks) {
+        String component = task.getRole();
+        Long logicalStartTaskId = logicalTaskMap.get(this).get(component);
+        // for now just set on outer map
+        registerPhysicalTaskId(logicalStartTaskId, task.getTaskId());
+
+        //todo: move this to provision
+        // set attempt count on task
+        for (HostRoleCommand logicalTask : logicalTasks.values()) {
+          if (logicalTask.getTaskId() == logicalStartTaskId) {
+            logicalTask.incrementAttemptCount();
           }
         }
       }
-      return addedHost;
     }
-
-    /**
-     * Register config groups for host group scoped configuration.
-     * For each host group with configuration specified in the blueprint, a config group is created
-     * and the hosts associated with the host group are assigned to the config group.
-     *
-     * @throws ResourceAlreadyExistsException attempt to create a config group that already exists
-     * @throws SystemException                an unexpected exception occurs
-     * @throws UnsupportedPropertyException   an invalid property is provided when creating a config group
-     * @throws NoSuchParentResourceException  attempt to create a config group for a non-existing cluster
-     */
-    private void createConfigGroupsAndRegisterHost() throws
-        ResourceAlreadyExistsException, SystemException,
-        UnsupportedPropertyException, NoSuchParentResourceException {
-
-      //HostGroupEntity entity = hostGroup.getEntity();
-      HostGroup hostGroup = getHostGroup();
-      Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>();
-
-      Stack stack = hostGroup.getStack();
-
-      // get the host-group config with cluster creation template overrides
-      Configuration topologyHostGroupConfig = topologyContext.getClusterTopology().
-          getHostGroupInfo().get(hostGroup.getName()).getConfiguration();
-
-      //handling backwards compatibility for group configs
-      //todo: doesn't belong here
-      handleGlobalsBackwardsCompability(stack, topologyHostGroupConfig.getProperties());
-
-      // iterate over topo host group configs which were defined in CCT/HG and BP/HG only, no parent configs
-      for (Map.Entry<String, Map<String, String>> entry: topologyHostGroupConfig.getProperties().entrySet()) {
-        String type = entry.getKey();
-        String service = stack.getServiceForConfigType(type);
-        Config config = new ConfigImpl(type);
-        config.setTag(hostGroup.getName());
-        config.setProperties(entry.getValue());
-        //todo: attributes
-        Map<String, Config> serviceConfigs = groupConfigs.get(service);
-        if (serviceConfigs == null) {
-          serviceConfigs = new HashMap<String, Config>();
-          groupConfigs.put(service, serviceConfigs);
-        }
-        serviceConfigs.put(type, config);
-      }
-
-      String bpName = topologyContext.getClusterTopology().getBlueprint().getName();
-      for (Map.Entry<String, Map<String, Config>> entry : groupConfigs.entrySet()) {
-        String service = entry.getKey();
-        Map<String, Config> serviceConfigs = entry.getValue();
-        String absoluteGroupName = getConfigurationGroupName(bpName, hostGroup.getName());
-        Collection<String> groupHosts;
-
-        groupHosts = topologyContext.getClusterTopology().getHostGroupInfo().
-            get(hostgroupName).getHostNames();
-
-        ConfigGroupRequest request = new ConfigGroupRequest(
-            null, getClusterName(), absoluteGroupName, service, "Host Group Configuration",
-            new HashSet<String>(groupHosts), serviceConfigs);
-
-        // get the config group provider and create config group resource
-        ConfigGroupResourceProvider configGroupProvider = (ConfigGroupResourceProvider)
-            ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.ConfigGroup);
-        configGroupProvider.createResources(Collections.singleton(request));
-      }
-    }
-
-
   }
 
   private class HostResourceAdapter implements Resource {

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 5273ff8..087ad4c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -22,12 +22,16 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
-import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.RequestStatusResponse;
 import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import org.apache.ambari.server.state.host.HostImpl;
 
 import java.util.ArrayList;
@@ -38,30 +42,49 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
-
-import static org.apache.ambari.server.controller.AmbariServer.getController;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Logical Request implementation.
  */
 public class LogicalRequest extends Request {
 
-  private Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>();
+  private final Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>();
   // sorted set with master host requests given priority
-  private Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>();
-  private Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>();
+  private final Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>();
+  private final Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>();
 
   private final ClusterTopology topology;
 
+  private static AmbariManagementController controller;
+
+  private static final AtomicLong hostIdCounter = new AtomicLong(1);
+
+
+  public LogicalRequest(Long id, TopologyRequest request, ClusterTopology topology)
+      throws AmbariException {
+
+    //todo: abstract usage of controller, etc ...
+    super(id, getController().getClusters().getCluster(
+        request.getClusterName()).getClusterId(), getController().getClusters());
+
+    setRequestContext(String.format("Logical Request: %s", request.getCommandDescription()));
+
+    this.topology = topology;
+    createHostRequests(request, topology);
+  }
+
+  public LogicalRequest(Long id, TopologyRequest request, ClusterTopology topology,
+                        TopologyLogicalRequestEntity requestEntity) throws AmbariException {
 
-  //todo: topologyContext is a temporary refactoring step
-  public LogicalRequest(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) throws AmbariException {
     //todo: abstract usage of controller, etc ...
-    super(getController().getActionManager().getNextRequestId(), getController().getClusters().getCluster(
-        requestRequest.getClusterName()).getClusterId(), getController().getClusters());
+    super(id, getController().getClusters().getCluster(
+        request.getClusterName()).getClusterId(), getController().getClusters());
 
-    this.topology = topologyContext.getClusterTopology();
-    createHostRequests(requestRequest, topologyContext);
+    setRequestContext(String.format("Logical Request: %s", request.getCommandDescription()));
+
+    this.topology = topology;
+    createHostRequests(topology, requestEntity);
   }
 
   public HostOfferResponse offer(HostImpl host) {
@@ -71,7 +94,7 @@ public class LogicalRequest extends Request {
       if (hostRequest != null) {
         HostOfferResponse response = hostRequest.offer(host);
         if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) {
-          //todo: error handling.  This is really a system exception and shouldn't happen
+          // host request rejected host that it explicitly requested
           throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " +
               host.getHostName());
         }
@@ -100,23 +123,16 @@ public class LogicalRequest extends Request {
     }
     // if at least one outstanding host request rejected for predicate or we have an outstanding request
     // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved
-    //todo: could also check if outstandingHostRequests is empty
     return predicateRejected || ! requestsWithReservedHosts.isEmpty() ?
         new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) :
         new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
   }
 
-  //todo
-  @Override
-  public Collection<Stage> getStages() {
-    return super.getStages();
-  }
-
   @Override
   public List<HostRoleCommand> getCommands() {
     List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
     for (HostRequest hostRequest : allHostRequests) {
-      commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getTasks()));
+      commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getLogicalTasks()));
     }
     return commands;
   }
@@ -125,6 +141,23 @@ public class LogicalRequest extends Request {
     return requestsWithReservedHosts.keySet();
   }
 
+  public boolean hasCompleted() {
+    return requestsWithReservedHosts.isEmpty() && outstandingHostRequests.isEmpty();
+  }
+
+  public Collection<HostRequest> getCompletedHostRequests() {
+    Collection<HostRequest> completedHostRequests = new ArrayList<HostRequest>(allHostRequests);
+    completedHostRequests.removeAll(outstandingHostRequests);
+    completedHostRequests.removeAll(requestsWithReservedHosts.values());
+
+    return completedHostRequests;
+  }
+
+  //todo: this is only here for toEntity() functionality
+  public Collection<HostRequest> getHostRequests() {
+    return new ArrayList<HostRequest>(allHostRequests);
+  }
+
   //todo: account for blueprint name?
   //todo: this should probably be done implicitly at a lower level
   public boolean areGroupsResolved(Collection<String> hostGroupNames) {
@@ -157,9 +190,7 @@ public class LogicalRequest extends Request {
     return hostComponentMap;
   }
 
-  //todo: currently we are just returning all stages for all requests
-  //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each
-  //todo: needed to change the name to avoid a name collision.
+  // currently we are just returning all stages for all requests
   public Collection<StageEntity> getStageEntities() {
     Collection<StageEntity> stages = new ArrayList<StageEntity>();
     for (HostRequest hostRequest : allHostRequests) {
@@ -182,8 +213,6 @@ public class LogicalRequest extends Request {
   public RequestStatusResponse getRequestStatus() {
     RequestStatusResponse requestStatus = new RequestStatusResponse(getRequestId());
     requestStatus.setRequestContext(getRequestContext());
-    //todo: other request status fields
-    //todo: ordering of tasks?
 
     // convert HostRoleCommands to ShortTaskStatus
     List<ShortTaskStatus> shortTasks = new ArrayList<ShortTaskStatus>();
@@ -191,7 +220,6 @@ public class LogicalRequest extends Request {
       shortTasks.add(new ShortTaskStatus(task));
     }
     requestStatus.setTasks(shortTasks);
-    //todo: null tasks?
 
     return requestStatus;
   }
@@ -249,13 +277,10 @@ public class LogicalRequest extends Request {
             timedout += 1;
             break;
           default:
-            //todo: proper log msg
             System.out.println("Unexpected status when creating stage summaries: " + taskStatus);
         }
       }
 
-      //todo: skippable.  I only see a skippable field on the stage, not the tasks
-      //todo: time related fields
       HostRoleCommandStatusSummaryDTO stageSummary = new HostRoleCommandStatusSummaryDTO(stage.isSkippable() ? 1 : 0, 0, 0,
           stage.getStageId(), aborted, completed, failed, holding, holdingFailed, holdingTimedout, inProgress, pending, queued, timedout);
       summaryMap.put(stage.getStageId(), stageSummary);
@@ -263,45 +288,69 @@ public class LogicalRequest extends Request {
     return summaryMap;
   }
 
-  //todo: context is a temporary refactoring step
-  private void createHostRequests(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) {
-    //todo: consistent stage ordering
-    //todo: confirm that stages don't need to be unique across requests
-    long stageIdCounter = 0;
-    Map<String, HostGroupInfo> hostGroupInfoMap = requestRequest.getHostGroupInfo();
+  private void createHostRequests(TopologyRequest request, ClusterTopology topology) {
+    Map<String, HostGroupInfo> hostGroupInfoMap = request.getHostGroupInfo();
+    Blueprint blueprint = topology.getBlueprint();
     for (HostGroupInfo hostGroupInfo : hostGroupInfoMap.values()) {
       String groupName = hostGroupInfo.getHostGroupName();
-      Blueprint blueprint = topology.getBlueprint();
-      int hostCardinality;
-      List<String> hostnames;
-
-      hostCardinality = hostGroupInfo.getRequestedHostCount();
-      hostnames = new ArrayList<String>(hostGroupInfo.getHostNames());
-
+      int hostCardinality = hostGroupInfo.getRequestedHostCount();
+      List<String> hostnames = new ArrayList<String>(hostGroupInfo.getHostNames());
 
       for (int i = 0; i < hostCardinality; ++i) {
         if (! hostnames.isEmpty()) {
           // host names are specified
           String hostname = hostnames.get(i);
-          //todo: pass in HostGroupInfo
-          HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
-              blueprint.getName(), blueprint.getHostGroup(groupName), hostname, hostGroupInfo.getPredicate(),
-              topologyContext);
+          HostRequest hostRequest = new HostRequest(getRequestId(), hostIdCounter.getAndIncrement(), getClusterName(),
+              hostname, blueprint.getName(), blueprint.getHostGroup(groupName), null, topology);
           synchronized (requestsWithReservedHosts) {
             requestsWithReservedHosts.put(hostname, hostRequest);
           }
         } else {
           // host count is specified
-          //todo: pass in HostGroupInfo
-          HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(),
-              blueprint.getName(), blueprint.getHostGroup(groupName), hostCardinality, hostGroupInfo.getPredicate(),
-              topologyContext);
+          HostRequest hostRequest = new HostRequest(getRequestId(), hostIdCounter.getAndIncrement(), getClusterName(),
+              null, blueprint.getName(), blueprint.getHostGroup(groupName), hostGroupInfo.getPredicate(), topology);
           outstandingHostRequests.add(hostRequest);
         }
       }
     }
-
     allHostRequests.addAll(outstandingHostRequests);
     allHostRequests.addAll(requestsWithReservedHosts.values());
   }
+
+  private void createHostRequests(ClusterTopology topology,
+                                  TopologyLogicalRequestEntity requestEntity) {
+
+    for (TopologyHostRequestEntity hostRequestEntity : requestEntity.getTopologyHostRequestEntities()) {
+      Long hostRequestId = hostRequestEntity.getId();
+      synchronized (hostIdCounter) {
+        if (hostIdCounter.get() <= hostRequestId) {
+          hostIdCounter.set(hostRequestId + 1);
+        }
+      }
+      TopologyHostGroupEntity hostGroupEntity = hostRequestEntity.getTopologyHostGroupEntity();
+
+      String reservedHostName = hostGroupEntity.
+          getTopologyHostInfoEntities().iterator().next().getFqdn();
+
+      //todo: move predicate processing to host request
+      HostRequest hostRequest = new HostRequest(getRequestId(), hostRequestId,
+          reservedHostName, topology, hostRequestEntity);
+
+      allHostRequests.add(hostRequest);
+      if (! hostRequest.isCompleted()) {
+        if (reservedHostName != null) {
+          requestsWithReservedHosts.put(reservedHostName, hostRequest);
+        } else {
+          outstandingHostRequests.add(hostRequest);
+        }
+      }
+    }
+  }
+
+  private synchronized static AmbariManagementController getController() {
+    if (controller == null) {
+      controller = AmbariServer.getController();
+    }
+    return controller;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java
new file mode 100644
index 0000000..a8a76b9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequestFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+
+/**
+ * Factory for creating logical requests
+ */
+//todo: throw more meaningful exception
+public class LogicalRequestFactory {
+  public LogicalRequest createRequest(Long id, TopologyRequest topologyRequest, ClusterTopology topology)
+      throws AmbariException {
+
+    return new LogicalRequest(id, topologyRequest, topology);
+  }
+
+  public LogicalRequest createRequest(Long id, TopologyRequest topologyRequest, ClusterTopology topology,
+                                      TopologyLogicalRequestEntity requestEntity) throws AmbariException {
+
+    return new LogicalRequest(id, topologyRequest, topology, requestEntity);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
new file mode 100644
index 0000000..dbf6735
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Persistence abstraction.
+ */
+public interface PersistedState {
+  /**
+   * Persist a topology request.
+   *
+   * @param topologyRequest  topologyh request to persist
+   *
+   * @return a persisted topology request which is a wrapper around a TopologyRequest which
+   * adds an id that can be used to refer to the persisted entity
+   */
+  PersistedTopologyRequest persistTopologyRequest(TopologyRequest topologyRequest);
+
+  /**
+   * Persist a logical request.
+   *
+   * @param logicalRequest     logical request to persist
+   * @param topologyRequestId  the id of the associated topology request
+   */
+  void persistLogicalRequest(LogicalRequest logicalRequest, long topologyRequestId);
+
+  /**
+   * Register a physical task with a logical task.
+   *
+   * @param logicalTaskId   logical task id
+   * @param physicalTaskId  physical task id
+   */
+  void registerPhysicalTask(long logicalTaskId, long physicalTaskId);
+
+  /**
+   * Registeer a host with a host request.
+   *
+   * @param hostRequestId  host request id
+   * @param hostName       name of host being registered
+   */
+  void registerHostName(long hostRequestId, String hostName);
+
+  /**
+   * Get all persisted requests.  This is used to replay all
+   * requests upon ambari startup.
+   *
+   * @return map of cluster topology to list of logical requests
+   */
+  Map<ClusterTopology, List<LogicalRequest>> getAllRequests();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
new file mode 100644
index 0000000..4101d67
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.StaticallyInject;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.api.predicate.InvalidQueryException;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.TopologyHostGroupDAO;
+import org.apache.ambari.server.orm.dao.TopologyHostRequestDAO;
+import org.apache.ambari.server.orm.dao.TopologyLogicalTaskDAO;
+import org.apache.ambari.server.orm.dao.TopologyRequestDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostInfoEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
+import org.apache.ambari.server.stack.NoSuchStackException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation which uses Ambari Database DAO and Entity objects for persistence
+ * of topology related information.
+ */
+@StaticallyInject
+public class PersistedStateImpl implements PersistedState {
+
+  protected final static Logger LOG = LoggerFactory.getLogger(PersistedState.class);
+
+  @Inject
+  private static TopologyRequestDAO topologyRequestDAO;
+
+  @Inject
+  private static TopologyHostGroupDAO hostGroupDAO;
+
+  @Inject
+  private static TopologyHostRequestDAO hostRequestDAO;
+
+  @Inject
+  private static TopologyLogicalTaskDAO topologyLogicalTaskDAO;
+
+  @Inject
+  private static HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private static HostRoleCommandDAO physicalTaskDAO;
+
+  @Inject
+  private static BlueprintFactory blueprintFactory;
+
+  @Inject
+  private static LogicalRequestFactory logicalRequestFactory;
+
+  @Inject
+  private static AmbariContext ambariContext;
+
+  private static Gson jsonSerializer = new Gson();
+
+
+  @Override
+  public  PersistedTopologyRequest persistTopologyRequest(TopologyRequest request) {
+    TopologyRequestEntity requestEntity = toEntity(request);
+    topologyRequestDAO.create(requestEntity);
+    return new PersistedTopologyRequest(requestEntity.getId(), request);
+  }
+
+  @Override
+  public void persistLogicalRequest(LogicalRequest logicalRequest, long topologyRequestId) {
+    TopologyRequestEntity topologyRequestEntity = topologyRequestDAO.findById(topologyRequestId);
+    TopologyLogicalRequestEntity entity = toEntity(logicalRequest, topologyRequestEntity);
+    topologyRequestEntity.setTopologyLogicalRequestEntity(entity);
+    //todo: how to handle missing topology request entity?
+
+    //logicalRequestDAO.create(entity);
+
+    topologyRequestDAO.merge(topologyRequestEntity);
+  }
+
+  @Override
+  public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) {
+    TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId);
+    HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId);
+    entity.setHostRoleCommandEntity(physicalEntity);
+
+    topologyLogicalTaskDAO.merge(entity);
+  }
+
+  @Override
+  public void registerHostName(long hostRequestId, String hostName) {
+    TopologyHostRequestEntity entity = hostRequestDAO.findById(hostRequestId);
+    if (entity.getHostName() == null) {
+      entity.setHostName(hostName);
+      hostRequestDAO.merge(entity);
+    }
+  }
+
+  @Override
+  public Map<ClusterTopology, List<LogicalRequest>> getAllRequests() {
+    //todo: we only currently support a single request per ambari instance so there should only
+    //todo: be a single cluster topology
+    Map<ClusterTopology, List<LogicalRequest>> allRequests = new HashMap<ClusterTopology, List<LogicalRequest>>();
+    Collection<TopologyRequestEntity> entities = topologyRequestDAO.findAll();
+
+    Map<String, ClusterTopology> topologyRequests = new HashMap<String, ClusterTopology>();
+    for (TopologyRequestEntity entity : entities) {
+      TopologyRequest replayedRequest = new ReplayedTopologyRequest(entity);
+      ClusterTopology clusterTopology = topologyRequests.get(replayedRequest.getClusterName());
+      if (clusterTopology == null) {
+        try {
+          clusterTopology = new ClusterTopologyImpl(ambariContext, replayedRequest);
+          topologyRequests.put(replayedRequest.getClusterName(), clusterTopology);
+          allRequests.put(clusterTopology, new ArrayList<LogicalRequest>());
+        } catch (InvalidTopologyException e) {
+          throw new RuntimeException("Failed to construct cluster topology while replaying request: " + e, e);
+        }
+      } else {
+        // ensure all host groups are provided in the combined cluster topology
+        for (Map.Entry<String, HostGroupInfo> groupInfoEntry : replayedRequest.getHostGroupInfo().entrySet()) {
+          String name = groupInfoEntry.getKey();
+          if (! clusterTopology.getHostGroupInfo().containsKey(name)) {
+            clusterTopology.getHostGroupInfo().put(name, groupInfoEntry.getValue());
+          }
+        }
+      }
+
+      TopologyLogicalRequestEntity logicalRequestEntity = entity.getTopologyLogicalRequestEntity();
+      Long logicalId = logicalRequestEntity.getId();
+
+      try {
+        //todo: fix initialization of ActionManager.requestCounter to account for logical requests
+        //todo: until this is fixed, increment the counter for every recovered logical request
+        //todo: this will cause gaps in the request id's after recovery
+        ambariContext.getNextRequestId();
+        allRequests.get(clusterTopology).add(logicalRequestFactory.createRequest(
+            logicalId, replayedRequest, clusterTopology, logicalRequestEntity));
+      } catch (AmbariException e) {
+        throw new RuntimeException("Failed to construct logical request during replay: " + e, e);
+      }
+    }
+
+    return allRequests;
+  }
+
+  private TopologyRequestEntity toEntity(TopologyRequest request) {
+    TopologyRequestEntity entity = new TopologyRequestEntity();
+
+    //todo: this isn't set for a scaling operation because we had intended to allow multiple
+    //todo: bp's to be used to scale a cluster although this isn't currently supported by
+    //todo: new topology infrastructure
+    entity.setAction(request.getType().name());
+    if (request.getBlueprint() != null) {
+      entity.setBlueprintName(request.getBlueprint().getName());
+    }
+
+    entity.setClusterAttributes(attributesAsString(request.getConfiguration().getAttributes()));
+    entity.setClusterName(request.getClusterName());
+    entity.setClusterProperties(propertiesAsString(request.getConfiguration().getProperties()));
+    entity.setDescription(request.getCommandDescription());
+
+    // host groups
+    Collection<TopologyHostGroupEntity> hostGroupEntities = new ArrayList<TopologyHostGroupEntity>();
+    for (HostGroupInfo groupInfo : request.getHostGroupInfo().values())  {
+      hostGroupEntities.add(toEntity(groupInfo, entity));
+    }
+    entity.setTopologyHostGroupEntities(hostGroupEntities);
+
+    return entity;
+  }
+
+  private TopologyLogicalRequestEntity toEntity(LogicalRequest request, TopologyRequestEntity topologyRequestEntity) {
+    TopologyLogicalRequestEntity entity = new TopologyLogicalRequestEntity();
+
+    entity.setDescription(request.getRequestContext());
+    entity.setId(request.getRequestId());
+    entity.setTopologyRequestEntity(topologyRequestEntity);
+    entity.setTopologyRequestId(topologyRequestEntity.getId());
+
+    // host requests
+    Collection<TopologyHostRequestEntity> hostRequests = new ArrayList<TopologyHostRequestEntity>();
+    entity.setTopologyHostRequestEntities(hostRequests);
+    for (HostRequest hostRequest : request.getHostRequests()) {
+      hostRequests.add(toEntity(hostRequest, entity));
+    }
+    return entity;
+  }
+
+  private TopologyHostRequestEntity toEntity(HostRequest request, TopologyLogicalRequestEntity logicalRequestEntity) {
+    TopologyHostRequestEntity entity = new TopologyHostRequestEntity();
+    entity.setHostName(request.getHostName());
+    entity.setId(request.getId());
+    entity.setStageId(request.getStageId());
+
+    entity.setTopologyLogicalRequestEntity(logicalRequestEntity);
+    entity.setTopologyHostGroupEntity(hostGroupDAO.findByRequestIdAndName(
+        logicalRequestEntity.getTopologyRequestId(), request.getHostgroupName()));
+
+    // logical tasks
+    Collection<TopologyHostTaskEntity> hostRequestTaskEntities = new ArrayList<TopologyHostTaskEntity>();
+    entity.setTopologyHostTaskEntities(hostRequestTaskEntities);
+    // for now only worry about install and start tasks
+    for (TopologyTask task : request.getTopologyTasks()) {
+      if (task.getType() == TopologyTask.Type.INSTALL || task.getType() == TopologyTask.Type.START) {
+        TopologyHostTaskEntity topologyTaskEntity = new TopologyHostTaskEntity();
+        hostRequestTaskEntities.add(topologyTaskEntity);
+        topologyTaskEntity.setType(task.getType().name());
+        topologyTaskEntity.setTopologyHostRequestEntity(entity);
+        Collection<TopologyLogicalTaskEntity> logicalTaskEntities = new ArrayList<TopologyLogicalTaskEntity>();
+        topologyTaskEntity.setTopologyLogicalTaskEntities(logicalTaskEntities);
+        for (Long logicalTaskId : request.getLogicalTasksForTopologyTask(task).values()) {
+          TopologyLogicalTaskEntity logicalTaskEntity = new TopologyLogicalTaskEntity();
+          logicalTaskEntities.add(logicalTaskEntity);
+          HostRoleCommand logicalTask = request.getLogicalTask(logicalTaskId);
+          logicalTaskEntity.setId(logicalTaskId);
+          logicalTaskEntity.setComponentName(logicalTask.getRole().name());
+          logicalTaskEntity.setTopologyHostTaskEntity(topologyTaskEntity);
+          Long physicalId = request.getPhysicalTaskId(logicalTaskId);
+          if (physicalId != null) {
+            logicalTaskEntity.setHostRoleCommandEntity(physicalTaskDAO.findByPK(physicalId));
+          }
+          logicalTaskEntity.setTopologyHostTaskEntity(topologyTaskEntity);
+        }
+      }
+    }
+    return entity;
+  }
+
+  private TopologyHostGroupEntity toEntity(HostGroupInfo groupInfo, TopologyRequestEntity topologyRequestEntity) {
+    TopologyHostGroupEntity entity = new TopologyHostGroupEntity();
+    entity.setGroupAttributes(attributesAsString(groupInfo.getConfiguration().getAttributes()));
+    entity.setGroupProperties(propertiesAsString(groupInfo.getConfiguration().getProperties()));
+    entity.setName(groupInfo.getHostGroupName());
+    entity.setTopologyRequestEntity(topologyRequestEntity);
+
+    // host info
+    Collection<TopologyHostInfoEntity> hostInfoEntities = new ArrayList<TopologyHostInfoEntity>();
+    entity.setTopologyHostInfoEntities(hostInfoEntities);
+
+    Collection<String> hosts = groupInfo.getHostNames();
+    if (hosts.isEmpty()) {
+      TopologyHostInfoEntity hostInfoEntity = new TopologyHostInfoEntity();
+      hostInfoEntity.setTopologyHostGroupEntity(entity);
+      hostInfoEntity.setHostCount(groupInfo.getRequestedHostCount());
+      if (groupInfo.getPredicate() != null) {
+        hostInfoEntity.setPredicate(groupInfo.getPredicateString());
+      }
+      hostInfoEntities.add(hostInfoEntity);
+    } else {
+      for (String hostName : hosts) {
+        TopologyHostInfoEntity hostInfoEntity = new TopologyHostInfoEntity();
+        hostInfoEntity.setTopologyHostGroupEntity(entity);
+        if (groupInfo.getPredicate() != null) {
+          hostInfoEntity.setPredicate(groupInfo.getPredicateString());
+        }
+        hostInfoEntity.setFqdn(hostName);
+        hostInfoEntity.setHostCount(0);
+        hostInfoEntities.add(hostInfoEntity);
+      }
+    }
+    return entity;
+  }
+
+
+  private static String propertiesAsString(Map<String, Map<String, String>> configurationProperties) {
+    return jsonSerializer.toJson(configurationProperties);
+  }
+
+  private static String attributesAsString(Map<String, Map<String, Map<String, String>>> configurationAttributes) {
+    return jsonSerializer.toJson(configurationAttributes);
+  }
+
+  private static class ReplayedTopologyRequest implements TopologyRequest {
+    private final String clusterName;
+    private final Type type;
+    private final String description;
+    private final Blueprint blueprint;
+    private final Configuration configuration;
+    private final Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<String, HostGroupInfo>();
+
+    public ReplayedTopologyRequest(TopologyRequestEntity entity) {
+      clusterName = entity.getClusterName();
+      type = Type.valueOf(entity.getAction());
+      description = entity.getDescription();
+
+      try {
+        blueprint = blueprintFactory.getBlueprint(entity.getBlueprintName());
+      } catch (NoSuchStackException e) {
+        throw new RuntimeException("Unable to load blueprint while replaying topology request: " + e, e);
+      }
+      configuration = createConfiguration(entity.getClusterProperties(), entity.getClusterAttributes());
+      configuration.setParentConfiguration(blueprint.getConfiguration());
+
+      parseHostGroupInfo(entity);
+    }
+
+    @Override
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    @Override
+    public Type getType() {
+      return type;
+    }
+
+    @Override
+    public Blueprint getBlueprint() {
+      return blueprint;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    @Override
+    public Map<String, HostGroupInfo> getHostGroupInfo() {
+      return hostGroupInfoMap;
+    }
+
+    @Override
+    public List<TopologyValidator> getTopologyValidators() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String getCommandDescription() {
+      return description;
+    }
+
+    private Configuration createConfiguration(String propString, String attributeString) {
+      Map<String, Map<String, String>> properties = jsonSerializer.
+          <Map<String, Map<String, String>>>fromJson(propString, Map.class);
+
+      Map<String, Map<String, Map<String, String>>> attributes = jsonSerializer.
+          <Map<String, Map<String, Map<String, String>>>>fromJson(attributeString, Map.class);
+
+      //todo: config parent
+      return new Configuration(properties, attributes);
+    }
+
+    private void parseHostGroupInfo(TopologyRequestEntity entity) {
+      for (TopologyHostGroupEntity hostGroupEntity : entity.getTopologyHostGroupEntities()) {
+        for (TopologyHostInfoEntity hostInfoEntity : hostGroupEntity.getTopologyHostInfoEntities()) {
+          String groupName = hostGroupEntity.getName();
+          HostGroupInfo groupInfo = hostGroupInfoMap.get(groupName);
+          if (groupInfo == null) {
+            groupInfo = new HostGroupInfo(groupName);
+            hostGroupInfoMap.put(groupName, groupInfo);
+          }
+
+          // if host names are specified, there will be one group info entity per name
+          // otherwise there is a single entity with requested count and predicate
+          String hostname = hostInfoEntity.getFqdn();
+          if (hostname != null && ! hostname.isEmpty()) {
+            groupInfo.addHost(hostname);
+          } else {
+            // should not be more than one group info if host count is specified
+            groupInfo.setRequestedCount(hostInfoEntity.getHostCount());
+            String hostPredicate = hostInfoEntity.getPredicate();
+            if (hostPredicate != null) {
+              try {
+                groupInfo.setPredicate(hostPredicate);
+              } catch (InvalidQueryException e) {
+                // log error but proceed with now predicate set
+                LOG.error(String.format(
+                    "Failed to compile predicate '%s' during request replay: %s", hostPredicate, e), e);
+              }
+            }
+          }
+
+          String groupConfigProperties = hostGroupEntity.getGroupProperties();
+          String groupConfigAttributes = hostGroupEntity.getGroupAttributes();
+          groupInfo.setConfiguration(createConfiguration(groupConfigProperties, groupConfigAttributes));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/807b3c2d/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java
new file mode 100644
index 0000000..184d9d2
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedTopologyRequest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+/**
+ * Wrapper around a TopologyRequest which adds an id that can be used
+ * to refer to the persisted entity.
+ */
+public class PersistedTopologyRequest {
+  private final long id;
+  private final TopologyRequest request;
+
+  public PersistedTopologyRequest(long id, TopologyRequest request) {
+    this.id = id;
+    this.request = request;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public TopologyRequest getRequest() {
+    return request;
+  }
+}