You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2017/06/19 07:43:59 UTC

[2/3] ambari git commit: AMBARI-21240. Some topology request data retained after host removed

AMBARI-21240. Some topology request data retained after host removed


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f8b32404
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f8b32404
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f8b32404

Branch: refs/heads/trunk
Commit: f8b32404003e7af2ab384e8c49b2821e0ce73b81
Parents: e890850
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Tue Jun 13 16:41:28 2017 +0200
Committer: Attila Doroszlai <ad...@hortonworks.com>
Committed: Mon Jun 19 09:37:11 2017 +0200

----------------------------------------------------------------------
 .../server/state/cluster/ClustersImpl.java      | 25 +-----
 .../ambari/server/topology/HostRequest.java     | 12 +--
 .../ambari/server/topology/LogicalRequest.java  | 33 ++++----
 .../ambari/server/topology/PersistedState.java  |  7 +-
 .../server/topology/PersistedStateImpl.java     | 18 ++++-
 .../ambari/server/topology/TopologyManager.java | 30 ++++++-
 .../server/state/cluster/ClustersTest.java      | 13 +++-
 .../server/topology/LogicalRequestTest.java     | 82 +++++++++++++++++++-
 8 files changed, 170 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 595ce4a..26efc84 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -52,9 +52,7 @@ import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
 import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
 import org.apache.ambari.server.orm.dao.StackDAO;
 import org.apache.ambari.server.orm.dao.TopologyHostInfoDAO;
-import org.apache.ambari.server.orm.dao.TopologyHostRequestDAO;
 import org.apache.ambari.server.orm.dao.TopologyLogicalTaskDAO;
-import org.apache.ambari.server.orm.dao.TopologyRequestDAO;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -63,10 +61,7 @@ import org.apache.ambari.server.orm.entities.PrivilegeEntity;
 import org.apache.ambari.server.orm.entities.ResourceEntity;
 import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
-import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
-import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
-import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
 import org.apache.ambari.server.security.SecurityHelper;
 import org.apache.ambari.server.security.authorization.AmbariGrantedAuthority;
 import org.apache.ambari.server.security.authorization.ResourceType;
@@ -81,6 +76,7 @@ import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.host.HostFactory;
+import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,11 +126,9 @@ public class ClustersImpl implements Clusters {
   @Inject
   private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
   @Inject
-  private TopologyHostRequestDAO topologyHostRequestDAO;
-  @Inject
-  private TopologyRequestDAO topologyRequestDAO;
-  @Inject
   private TopologyHostInfoDAO topologyHostInfoDAO;
+  @Inject
+  private TopologyManager topologyManager;
 
   /**
    * Data access object for stacks.
@@ -742,18 +736,7 @@ public class ClustersImpl implements Clusters {
       }
     }
 
-    for (Long clusterId : clusterIds) {
-      for (TopologyRequestEntity topologyRequestEntity : topologyRequestDAO.findByClusterId(
-          clusterId)) {
-        TopologyLogicalRequestEntity topologyLogicalRequestEntity = topologyRequestEntity.getTopologyLogicalRequestEntity();
-
-        for (TopologyHostRequestEntity topologyHostRequestEntity : topologyLogicalRequestEntity.getTopologyHostRequestEntities()) {
-          if (hostname.equals(topologyHostRequestEntity.getHostName())) {
-            topologyHostRequestDAO.remove(topologyHostRequestEntity);
-          }
-        }
-      }
-    }
+    topologyManager.removeHostRequests(hostname);
 
     entity.setHostStateEntity(null);
     hostStateDAO.removeByHostId(entity.getHostId());

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
index 54420a4..2c4e25e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -40,7 +40,7 @@ import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
 import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity;
-import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.topology.tasks.InstallHostTask;
 import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask;
 import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask;
@@ -135,7 +135,7 @@ public class HostRequest implements Comparable<HostRequest> {
   }
 
   //todo: synchronization
-  public synchronized HostOfferResponse offer(HostImpl host) {
+  public synchronized HostOfferResponse offer(Host host) {
     if (!isOutstanding) {
       return HostOfferResponse.DECLINED_DUE_TO_DONE;
     }
@@ -301,7 +301,7 @@ public class HostRequest implements Comparable<HostRequest> {
         AmbariContext.TaskType.START;
   }
 
-  private void setHostOnTasks(HostImpl host) {
+  private void setHostOnTasks(Host host) {
     for (HostRoleCommand task : getLogicalTasks()) {
       task.setHost(host.getHostId(), host.getHostName());
     }
@@ -393,7 +393,7 @@ public class HostRequest implements Comparable<HostRequest> {
     return containsMaster;
   }
 
-  public boolean matchesHost(HostImpl host) {
+  public boolean matchesHost(Host host) {
     return (hostname != null) ?
         host.getHostName().equals(hostname) :
         predicate == null || predicate.evaluate(new HostResourceAdapter(host));
@@ -456,7 +456,7 @@ public class HostRequest implements Comparable<HostRequest> {
   private class HostResourceAdapter implements Resource {
     Resource hostResource;
 
-    public HostResourceAdapter(HostImpl host) {
+    public HostResourceAdapter(Host host) {
       buildPropertyMap(host);
     }
 
@@ -485,7 +485,7 @@ public class HostRequest implements Comparable<HostRequest> {
       // read only, nothing to do
     }
 
-    private void buildPropertyMap(HostImpl host) {
+    private void buildPropertyMap(Host host) {
       hostResource = new ResourceImpl(Resource.Type.Host);
 
       hostResource.setProperty(HostResourceProvider.HOST_HOST_NAME_PROPERTY_ID,

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 4638dbf..2f08115 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -44,7 +44,7 @@ import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostInfoEntity;
 import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
-import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.ambari.server.state.Host;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,7 +95,7 @@ public class LogicalRequest extends Request {
     createHostRequests(topology, requestEntity);
   }
 
-  public HostOfferResponse offer(HostImpl host) {
+  public HostOfferResponse offer(Host host) {
     // attempt to match to a host request with an explicit host reservation first
     synchronized (requestsWithReservedHosts) {
       LOG.info("LogicalRequest.offer: attempting to match a request to a request for a reserved host to hostname = {}", host.getHostName());
@@ -197,7 +197,11 @@ public class LogicalRequest extends Request {
         pendingHostRequests.add(hostRequest);
       }
     }
-    outstandingHostRequests.clear();
+    if (hostGroupName == null) {
+      outstandingHostRequests.clear();
+    } else {
+      outstandingHostRequests.removeAll(pendingHostRequests);
+    }
 
     Collection<String> pendingReservedHostNames = new ArrayList<>();
     for(String reservedHostName : requestsWithReservedHosts.keySet()) {
@@ -207,9 +211,7 @@ public class LogicalRequest extends Request {
         pendingReservedHostNames.add(reservedHostName);
       }
     }
-    for (String hostName : pendingReservedHostNames) {
-      requestsWithReservedHosts.remove(hostName);
-    }
+    requestsWithReservedHosts.keySet().removeAll(pendingReservedHostNames);
 
     allHostRequests.removeAll(pendingHostRequests);
     return pendingHostRequests;
@@ -371,31 +373,36 @@ public class LogicalRequest extends Request {
    * Removes all HostRequest associated with the passed host name from internal collections
    * @param hostName name of the host
    */
-  public void removeHostRequestByHostName(String hostName) {
+  public Set<HostRequest> removeHostRequestByHostName(String hostName) {
+    Set<HostRequest> removed = new HashSet<>();
     synchronized (requestsWithReservedHosts) {
       synchronized (outstandingHostRequests) {
         requestsWithReservedHosts.remove(hostName);
 
         Iterator<HostRequest> hostRequestIterator = outstandingHostRequests.iterator();
         while (hostRequestIterator.hasNext()) {
-          if (Objects.equals(hostRequestIterator.next().getHostName(), hostName)) {
+          HostRequest hostRequest = hostRequestIterator.next();
+          if (Objects.equals(hostRequest.getHostName(), hostName)) {
             hostRequestIterator.remove();
+            removed.add(hostRequest);
             break;
           }
         }
 
         //todo: synchronization
-        Iterator<HostRequest> allHostRequesIterator = allHostRequests.iterator();
-        while (allHostRequesIterator.hasNext()) {
-          if (Objects.equals(allHostRequesIterator.next().getHostName(), hostName)) {
-            allHostRequesIterator.remove();
+        Iterator<HostRequest> allHostRequestIterator = allHostRequests.iterator();
+        while (allHostRequestIterator.hasNext()) {
+          HostRequest hostRequest = allHostRequestIterator.next();
+          if (Objects.equals(hostRequest.getHostName(), hostName)) {
+            allHostRequestIterator.remove();
+            removed.add(hostRequest);
             break;
           }
         }
       }
     }
 
-
+    return removed;
   }
 
   private void createHostRequests(TopologyRequest request, ClusterTopology topology) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
index 9e94134..369bf52 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -81,8 +81,9 @@ public interface PersistedState {
   LogicalRequest getProvisionRequest(long clusterId);
 
   /**
-   *
-   * @param hostRequests
+   * Remove the given host requests (must belong to the same topology request),
+   * and also the topology request if it does not have any host requests left.
    */
-  void removeHostRequests(Collection<HostRequest> hostRequests);
+  void removeHostRequests(long logicalRequestId, Collection<HostRequest> hostRequests);
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index 4380778..e64a16e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -35,6 +35,7 @@ import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.TopologyHostGroupDAO;
 import org.apache.ambari.server.orm.dao.TopologyHostInfoDAO;
 import org.apache.ambari.server.orm.dao.TopologyHostRequestDAO;
+import org.apache.ambari.server.orm.dao.TopologyLogicalRequestDAO;
 import org.apache.ambari.server.orm.dao.TopologyLogicalTaskDAO;
 import org.apache.ambari.server.orm.dao.TopologyRequestDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -80,6 +81,9 @@ public class PersistedStateImpl implements PersistedState {
   private TopologyHostRequestDAO hostRequestDAO;
 
   @Inject
+  private TopologyLogicalRequestDAO topologyLogicalRequestDAO;
+
+  @Inject
   private TopologyLogicalTaskDAO topologyLogicalTaskDAO;
 
   @Inject
@@ -121,11 +125,20 @@ public class PersistedStateImpl implements PersistedState {
 
   @Override
   @Transactional
-  public void removeHostRequests(Collection<HostRequest> hostRequests) {
-    for(HostRequest hostRequest :  hostRequests) {
+  public void removeHostRequests(long logicalRequestId, Collection<HostRequest> hostRequests) {
+    TopologyLogicalRequestEntity logicalRequest = topologyLogicalRequestDAO.findById(logicalRequestId);
+    for (HostRequest hostRequest : hostRequests) {
       TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequest.getId());
+      if (logicalRequest != null)  {
+        logicalRequest.getTopologyHostRequestEntities().remove(hostRequestEntity);
+      }
       hostRequestDAO.remove(hostRequestEntity);
     }
+    if (logicalRequest != null && logicalRequest.getTopologyHostRequestEntities().isEmpty()) {
+      Long topologyRequestId = logicalRequest.getTopologyRequestId();
+      topologyLogicalRequestDAO.remove(logicalRequest);
+      topologyRequestDAO.removeByPK(topologyRequestId);
+    }
   }
 
   @Override
@@ -350,7 +363,6 @@ public class PersistedStateImpl implements PersistedState {
     return entity;
   }
 
-
   private static String propertiesAsString(Map<String, Map<String, String>> configurationProperties) {
     return jsonSerializer.toJson(configurationProperties);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 73fe33b..302e321 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -526,8 +526,11 @@ public class TopologyManager {
     if (!logicalRequest.hasPendingHostRequests()) {
       outstandingRequests.remove(logicalRequest);
     }
+    if (logicalRequest.getHostRequests().isEmpty()) {
+      allRequests.remove(requestId);
+    }
 
-    persistedState.removeHostRequests(pendingHostRequests);
+    persistedState.removeHostRequests(requestId, pendingHostRequests);
 
     // set current host count to number of currently connected hosts
     for (HostGroupInfo currentHostGroupInfo : topology.getHostGroupInfo().values()) {
@@ -538,6 +541,31 @@ public class TopologyManager {
   }
 
   /**
+   * Removes topology host requests matched to the given host.  If the parent
+   * request has no more child host requests, then it is also removed.
+   * This is used when hosts are deleted from the cluster.
+   *
+   * @param hostName the host name for which requests should be removed
+   */
+  public void removeHostRequests(String hostName) {
+    ensureInitialized();
+
+    for (Iterator<LogicalRequest> iter = allRequests.values().iterator(); iter.hasNext(); ) {
+      LogicalRequest logicalRequest = iter.next();
+      Collection<HostRequest> removed = logicalRequest.removeHostRequestByHostName(hostName);
+      if (!logicalRequest.hasPendingHostRequests()) {
+        outstandingRequests.remove(logicalRequest);
+      }
+      if (logicalRequest.getHostRequests().isEmpty()) {
+        iter.remove();
+      }
+      if (!removed.isEmpty()) {
+        persistedState.removeHostRequests(logicalRequest.getRequestId(), removed);
+      }
+    }
+  }
+
+  /**
    * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
    * provision cluster request and topology.
    * @param request Provision cluster request to create a logical request for.

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
index 95e1d70..39f674c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
@@ -75,6 +75,7 @@ import org.apache.ambari.server.topology.HostGroupInfo;
 import org.apache.ambari.server.topology.HostRequest;
 import org.apache.ambari.server.topology.LogicalRequest;
 import org.apache.ambari.server.topology.PersistedState;
+import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.topology.TopologyRequest;
 import org.apache.ambari.server.topology.tasks.TopologyTask;
 import org.apache.ambari.server.utils.EventBusSynchronizer;
@@ -83,9 +84,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
+import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
 
 import junit.framework.Assert;
 
@@ -107,7 +111,7 @@ public class ClustersTest {
 
   @Before
   public void setup() throws Exception {
-    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector = Guice.createInjector(Modules.override(new InMemoryDefaultTestModule()).with(new MockModule()));
     injector.getInstance(GuiceJpaInitializer.class);
     clusters = injector.getInstance(Clusters.class);
     injector.injectMembers(this);
@@ -637,4 +641,11 @@ public class ClustersTest {
 
     return clusters.getCluster(clusterName);
   }
+
+  private static class MockModule implements Module {
+    @Override
+    public void configure(Binder binder) {
+      binder.bind(TopologyManager.class).toInstance(createNiceMock(TopologyManager.class));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f8b32404/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
index 3e8ea0c..8aac927 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
@@ -25,6 +25,8 @@ import static org.powermock.api.easymock.PowerMock.mockStatic;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -521,8 +523,84 @@ public class LogicalRequestTest extends EasyMockSupport {
     // Then
     verifyAll();
 
-    Collection<HostRequest>  hostRequests = req.getHostRequests();
-    assertEquals(1, hostRequests.size());
+    assertEquals(1, req.getHostRequests().size());
+    assertEquals(0, req.getPendingHostRequestCount());
+  }
+
+  @Test
+  public void testRemovePendingHostRequestsByHostCount() throws Exception {
+    // Given
+    int hostCount = 3;
+    LogicalRequest req = createTopologyRequestByHostCount(hostCount, "host_group");
+    assertEquals(hostCount, req.getPendingHostRequestCount());
+
+    // When
+    req.removePendingHostRequests(null);
+
+    // Then
+    assertEquals(0, req.getPendingHostRequestCount());
+    verifyAll();
+  }
+
+  @Test
+  public void testRemovePendingHostRequestsOfSpecificHostGroupByHostCount() throws Exception {
+    // Given
+    int hostCount = 3;
+    String hostGroupName = "host_group";
+    LogicalRequest req = createTopologyRequestByHostCount(hostCount, hostGroupName);
+    assertEquals(hostCount, req.getPendingHostRequestCount());
+
+    // When
+    req.removePendingHostRequests(hostGroupName);
+
+    // Then
+    assertEquals(0, req.getPendingHostRequestCount());
+    verifyAll();
+  }
+
+  @Test
+  public void testRemovePendingHostRequestsOfNonexistentHostGroupByHostCount() throws Exception {
+    // Given
+    int hostCount = 3;
+    LogicalRequest req = createTopologyRequestByHostCount(hostCount, "host_group");
+    assertEquals(hostCount, req.getPendingHostRequestCount());
+
+    // When
+    req.removePendingHostRequests("no_such_host_group");
+
+    // Then
+    assertEquals(hostCount, req.getPendingHostRequestCount());
+    verifyAll();
+  }
+
+  private LogicalRequest createTopologyRequestByHostCount(int hostCount, String hostGroupName) throws Exception {
+    final TopologyHostInfoEntity hostInfo = new TopologyHostInfoEntity();
+    hostInfo.setId(100L);
+    hostInfo.setHostCount(hostCount);
+
+    TopologyHostGroupEntity hostGroupEntity = new TopologyHostGroupEntity();
+    hostGroupEntity.setTopologyHostInfoEntities(ImmutableSet.of(hostInfo));
+    hostGroupEntity.setName(hostGroupName);
+
+    TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity();
+    topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity));
+
+    Set<TopologyHostRequestEntity> hostRequests = new HashSet<>();
+    for (long i = 0; i < hostCount; ++i) {
+      TopologyHostRequestEntity hostRequestEntity = new TopologyHostRequestEntity();
+      hostRequestEntity.setId(i);
+      hostRequestEntity.setTopologyHostGroupEntity(hostGroupEntity);
+      hostRequestEntity.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+      hostRequests.add(hostRequestEntity);
+    }
+
+    expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).anyTimes();
+    expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(hostRequests).anyTimes();
+    expect(blueprint.getHostGroup(eq(hostGroupEntity.getName()))).andReturn(hostGroup1).anyTimes();
+    expect(hostGroup1.containsMasterComponent()).andReturn(false).anyTimes();
+
+    replayAll();
 
+    return new LogicalRequest(1L, replayedTopologyRequest, clusterTopology, logicalRequestEntity);
   }
 }