You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2017/04/28 21:21:39 UTC

ambari git commit: AMBARI-20809. Support hostgroup downscale by host_count (magyari_sandor)

Repository: ambari
Updated Branches:
  refs/heads/trunk 19f9c983e -> 79cca1c71


AMBARI-20809. Support hostgroup downscale by host_count (magyari_sandor)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79cca1c7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79cca1c7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79cca1c7

Branch: refs/heads/trunk
Commit: 79cca1c7184f1661236971dac70d85a83fab6c11
Parents: 19f9c98
Author: Sandor Magyari <sm...@hortonworks.com>
Authored: Fri Apr 28 14:42:55 2017 +0200
Committer: Sandor Magyari <sm...@hortonworks.com>
Committed: Fri Apr 28 23:20:35 2017 +0200

----------------------------------------------------------------------
 .../checks/DatabaseConsistencyCheckHelper.java  | 70 ++++++++++--------
 .../server/controller/RequestRequest.java       | 11 +++
 .../internal/RequestResourceProvider.java       | 77 ++++++++++++++------
 .../ambari/server/topology/LogicalRequest.java  | 38 +++++++++-
 .../ambari/server/topology/PersistedState.java  |  7 ++
 .../server/topology/PersistedStateImpl.java     | 10 +++
 .../ambari/server/topology/TopologyManager.java | 56 +++++++++++---
 .../src/main/resources/properties.json          |  2 +
 .../DatabaseConsistencyCheckHelperTest.java     |  7 +-
 .../internal/RequestResourceProviderTest.java   | 33 ++++++---
 .../server/topology/LogicalRequestTest.java     | 64 ++++++++++++++++
 .../server/topology/TopologyManagerTest.java    |  9 ++-
 12 files changed, 308 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index b2a03e4..0b7f122 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -455,14 +455,14 @@ public class DatabaseConsistencyCheckHelper {
     String SELECT_REQUEST_COUNT_QUERY = "select count(tpr.id) from topology_request tpr";
 
     String SELECT_JOINED_COUNT_QUERY = "select count(DISTINCT tpr.id) from topology_request tpr join " +
-      "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = " +
-      "thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
-      "tlt on tht.id = tlt.host_task_id";
+      "topology_logical_request tlr on tpr.id = tlr.request_id";
 
-    int topologyRequestCount = 0;
-    int topologyRequestTablesJoinedCount = 0;
+    String SELECT_HOST_REQUEST_COUNT_QUERY = "select count(thr.id) from topology_host_request thr";
+
+    String SELECT_HOST_JOINED_COUNT_QUERY = "select count(DISTINCT thr.id) from topology_host_request thr join " +
+            "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
+            "tlt on tht.id = tlt.host_task_id";
 
-    ResultSet rs = null;
     Statement statement = null;
 
     if (connection == null) {
@@ -475,38 +475,25 @@ public class DatabaseConsistencyCheckHelper {
     try {
       statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
 
-      rs = statement.executeQuery(SELECT_REQUEST_COUNT_QUERY);
-      if (rs != null) {
-        while (rs.next()) {
-          topologyRequestCount = rs.getInt(1);
-        }
-      }
-
-      rs = statement.executeQuery(SELECT_JOINED_COUNT_QUERY);
-      if (rs != null) {
-        while (rs.next()) {
-          topologyRequestTablesJoinedCount = rs.getInt(1);
-        }
-      }
+      int topologyRequestCount = runQuery(statement, SELECT_REQUEST_COUNT_QUERY);
+      int topologyRequestTablesJoinedCount = runQuery(statement, SELECT_JOINED_COUNT_QUERY);
 
       if (topologyRequestCount != topologyRequestTablesJoinedCount) {
         error("Your topology request hierarchy is not complete for each row in topology_request should exist " +
-          "at least one raw in topology_logical_request, topology_host_request, topology_host_task, " +
-          "topology_logical_task.");
+          "at least one row in topology_logical_request");
       }
 
+      int topologyHostRequestCount = runQuery(statement, SELECT_HOST_REQUEST_COUNT_QUERY);
+      int topologyHostRequestTablesJoinedCount = runQuery(statement, SELECT_HOST_JOINED_COUNT_QUERY);
+
+      if (topologyHostRequestCount != topologyHostRequestTablesJoinedCount) {
+        error("Your topology request hierarchy is not complete for each row in topology_host_request should exist " +
+                "at least one row in topology_host_task, topology_logical_task.");
+      }
 
     } catch (SQLException e) {
       LOG.error("Exception occurred during topology request tables check: ", e);
     } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          LOG.error("Exception occurred during result set closing procedure: ", e);
-        }
-      }
-
       if (statement != null) {
         try {
           statement.close();
@@ -518,6 +505,31 @@ public class DatabaseConsistencyCheckHelper {
 
   }
 
+  private static int runQuery(Statement statement, String query) {
+    ResultSet rs = null;
+    int result = 0;
+    try {
+      rs = statement.executeQuery(query);
+
+      if (rs != null) {
+        while (rs.next()) {
+          result = rs.getInt(1);
+        }
+      }
+
+    } catch (SQLException e) {
+      LOG.error("Exception occurred during topology request tables check: ", e);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception occurred during result set closing procedure: ", e);
+        }
+      }
+    }
+    return result;
+  }
 
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
index db9268b..05c4bad 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
@@ -37,6 +37,8 @@ public class RequestRequest {
 
   private String abortReason;
 
+  private boolean removePendingHostRequests = false;
+
 
   public HostRoleStatus getStatus() {
     return status;
@@ -70,6 +72,14 @@ public class RequestRequest {
     this.abortReason = abortReason;
   }
 
+  public boolean isRemovePendingHostRequests() {
+    return removePendingHostRequests;
+  }
+
+  public void setRemovePendingHostRequests(boolean removePendingHostRequests) {
+    this.removePendingHostRequests = removePendingHostRequests;
+  }
+
   @Override
   public String toString() {
     return "RequestRequest{" +
@@ -77,6 +87,7 @@ public class RequestRequest {
             ", requestId=" + requestId +
             ", status=" + status +
             ", abortReason='" + abortReason + '\'' +
+            ", removePendingHostRequests='" + removePendingHostRequests + '\'' +
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index 9704b33..57e7024 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -115,6 +115,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
   protected static final String REQUEST_COMPLETED_TASK_CNT_ID = "Requests/completed_task_count";
   protected static final String REQUEST_QUEUED_TASK_CNT_ID = "Requests/queued_task_count";
   protected static final String REQUEST_PROGRESS_PERCENT_ID = "Requests/progress_percent";
+  protected static final String REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID = "Requests/remove_pending_host_requests";
+  protected static final String REQUEST_PENDING_HOST_REQUEST_COUNT_ID = "Requests/pending_host_request_count";
+
   protected static final String COMMAND_ID = "command";
   protected static final String SERVICE_ID = "service_name";
   protected static final String COMPONENT_ID = "component_name";
@@ -152,7 +155,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
     REQUEST_TIMED_OUT_TASK_CNT_ID,
     REQUEST_COMPLETED_TASK_CNT_ID,
     REQUEST_QUEUED_TASK_CNT_ID,
-    REQUEST_PROGRESS_PERCENT_ID);
+    REQUEST_PROGRESS_PERCENT_ID,
+    REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID,
+    REQUEST_PENDING_HOST_REQUEST_COUNT_ID);
 
   // ----- Constructors ----------------------------------------------------
 
@@ -297,6 +302,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
   public RequestStatus updateResources(Request requestInfo, Predicate predicate)
           throws SystemException, UnsupportedPropertyException,
           NoSuchResourceException, NoSuchParentResourceException {
+
     AmbariManagementController amc = getManagementController();
     final Set<RequestRequest> requests = new HashSet<>();
 
@@ -319,33 +325,48 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
       }
       // There should be only one request with this id (or no request at all)
       org.apache.ambari.server.actionmanager.Request internalRequest = internalRequests.get(0);
-      // Validate update request (check constraints on state value and presence of abort reason)
-      if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) {
-        throw new IllegalArgumentException("Abort reason can not be empty.");
-      }
 
-      if (updateRequest.getStatus() != HostRoleStatus.ABORTED) {
-        throw new IllegalArgumentException(
-                String.format("%s is wrong value. The only allowed value " +
-                                "for updating request status is ABORTED",
-                        updateRequest.getStatus()));
-      }
+      if (updateRequest.isRemovePendingHostRequests()) {
+        if (internalRequest instanceof LogicalRequest) {
+          targets.add(internalRequest);
+        } else {
+          throw new IllegalArgumentException("Request with id: " + internalRequest.getRequestId() + "is not a Logical Request.");
+        }
+      } else {
+        // Validate update request (check constraints on state value and presence of abort reason)
+        if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) {
+          throw new IllegalArgumentException("Abort reason can not be empty.");
+        }
+
+        if (updateRequest.getStatus() != HostRoleStatus.ABORTED) {
+          throw new IllegalArgumentException(
+                  String.format("%s is wrong value. The only allowed value " +
+                                  "for updating request status is ABORTED",
+                          updateRequest.getStatus()));
+        }
 
-      HostRoleStatus internalRequestStatus =
-          CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus();
+        HostRoleStatus internalRequestStatus =
+                CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus();
 
-      if (internalRequestStatus.isCompletedState()) {
-        // Ignore updates to completed requests to avoid throwing exception on race condition
-      } else {
-        // Validation passed
-        targets.add(internalRequest);
+        if (internalRequestStatus.isCompletedState()) {
+          // Ignore updates to completed requests to avoid throwing exception on race condition
+        } else {
+          // Validation passed
+          targets.add(internalRequest);
+        }
       }
+
     }
+
     // Perform update
     Iterator<RequestRequest> reqIterator = requests.iterator();
     for (org.apache.ambari.server.actionmanager.Request target : targets) {
-      String reason = reqIterator.next().getAbortReason();
-      amc.getActionManager().cancelRequest(target.getRequestId(), reason);
+      if (target instanceof LogicalRequest) {
+        topologyManager.removePendingHostRequests(target.getClusterName(), target.getRequestId());
+      } else {
+        String reason = reqIterator.next().getAbortReason();
+        amc.getActionManager().cancelRequest(target.getRequestId(), reason);
+      }
     }
     return getRequestStatus(null);
   }
@@ -363,9 +384,15 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
       requestStatus = HostRoleStatus.valueOf(requestStatusStr);
     }
     String abortReason = (String) propertyMap.get(REQUEST_ABORT_REASON_PROPERTY_ID);
+    String removePendingHostRequests = (String) propertyMap.get(REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID);
+
     RequestRequest requestRequest = new RequestRequest(clusterNameStr, requestId);
     requestRequest.setStatus(requestStatus);
     requestRequest.setAbortReason(abortReason);
+    if (removePendingHostRequests != null) {
+      requestRequest.setRemovePendingHostRequests(Boolean.valueOf(removePendingHostRequests));
+    }
+
     return requestRequest;
 
   }
@@ -753,13 +780,21 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
       // in this case, it appears that there are no tasks but this is a logical
       // topology request, so it's a matter of hosts simply not registering yet
       // for tasks to be created
-      status = CalculatedStatus.PENDING;
+      if (logicalRequest.hasPendingHostRequests()) {
+        status = CalculatedStatus.PENDING;
+      } else {
+        status = CalculatedStatus.COMPLETED;
+      }
     } else {
       // there are either tasks or this is not a logical request, so do normal
       // status calculations
       status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
     }
 
+    if (null != logicalRequest) {
+      setResourceProperty(resource, REQUEST_PENDING_HOST_REQUEST_COUNT_ID, logicalRequest.getPendingHostRequestCount(), requestedPropertyIds);
+    }
+
     setResourceProperty(resource, REQUEST_STATUS_PROPERTY_ID, status.getStatus().toString(), requestedPropertyIds);
     setResourceProperty(resource, REQUEST_PROGRESS_PERCENT_ID, status.getPercent(), requestedPropertyIds);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index a271c0b..b5ee94b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -164,8 +164,8 @@ public class LogicalRequest extends Request {
     return requestsWithReservedHosts.keySet();
   }
 
-  public boolean hasCompleted() {
-    return requestsWithReservedHosts.isEmpty() && outstandingHostRequests.isEmpty();
+  public boolean hasPendingHostRequests() {
+    return !requestsWithReservedHosts.isEmpty() || !outstandingHostRequests.isEmpty();
   }
 
   public Collection<HostRequest> getCompletedHostRequests() {
@@ -176,11 +176,45 @@ public class LogicalRequest extends Request {
     return completedHostRequests;
   }
 
+  public int getPendingHostRequestCount() {
+    return outstandingHostRequests.size() + requestsWithReservedHosts.size();
+  }
+
   //todo: this is only here for toEntity() functionality
   public Collection<HostRequest> getHostRequests() {
     return new ArrayList<>(allHostRequests);
   }
 
+  /**
+   * Removes pending host requests (outstanding requests not picked up by any host, where hostName is null) for a host group.
+   * @param hostGroupName
+   * @return
+   */
+  public Collection<HostRequest> removePendingHostRequests(String hostGroupName) {
+    Collection<HostRequest> pendingHostRequests = new ArrayList<>();
+    for(HostRequest hostRequest : outstandingHostRequests) {
+      if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) {
+        pendingHostRequests.add(hostRequest);
+      }
+    }
+    outstandingHostRequests.clear();
+
+    Collection<String> pendingReservedHostNames = new ArrayList<>();
+    for(String reservedHostName : requestsWithReservedHosts.keySet()) {
+      HostRequest hostRequest = requestsWithReservedHosts.get(reservedHostName);
+      if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) {
+        pendingHostRequests.add(hostRequest);
+        pendingReservedHostNames.add(reservedHostName);
+      }
+    }
+    for (String hostName : pendingReservedHostNames) {
+      requestsWithReservedHosts.remove(hostName);
+    }
+
+    allHostRequests.removeAll(pendingHostRequests);
+    return pendingHostRequests;
+  }
+
   public Map<String, Collection<String>> getProjectedTopology() {
     Map<String, Collection<String>> hostComponentMap = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
index 354764b..f353b8c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.topology;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -78,4 +79,10 @@ public interface PersistedState {
    * @return
    */
   LogicalRequest getProvisionRequest(long clusterId);
+
+  /**
+   *
+   * @param hostRequests
+   */
+  void removeHostRequests(Collection<HostRequest> hostRequests);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index 36eb1bc..a8b202e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.gson.Gson;
 import com.google.inject.Inject;
+import com.google.inject.persist.Transactional;
 
 /**
  * Implementation which uses Ambari Database DAO and Entity objects for persistence
@@ -119,6 +120,15 @@ public class PersistedStateImpl implements PersistedState {
   }
 
   @Override
+  @Transactional
+  public void removeHostRequests(Collection<HostRequest> hostRequests) {
+    for(HostRequest hostRequest :  hostRequests) {
+      TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequest.getId());
+      hostRequestDAO.remove(hostRequestEntity);
+    }
+  }
+
+  @Override
   public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) {
     TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId);
     HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 643945c..f5cf498 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -434,7 +434,7 @@ public class TopologyManager {
 
     Map<String, String> requestInfoProps = new HashMap<>();
     requestInfoProps.put(org.apache.ambari.server.controller.spi.Request.REQUEST_INFO_BODY_PROPERTY,
-      "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}");
+            "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}");
 
     org.apache.ambari.server.controller.spi.Request request = new RequestImpl(Collections.<String>emptySet(),
         Collections.singleton(properties), requestInfoProps, null);
@@ -485,20 +485,55 @@ public class TopologyManager {
 
     final Long requestId = ambariContext.getNextRequestId();
     LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
-        @Override
-        public LogicalRequest call() throws Exception {
-          LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
+         @Override
+         public LogicalRequest call() throws Exception {
+           LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
 
-          return logicalRequest;
-        }
-      }
+           return logicalRequest;
+         }
+       }
     );
-
     processRequest(request, topology, logicalRequest);
-
     return getRequestStatus(logicalRequest.getRequestId());
   }
 
+  public void removePendingHostRequests(String clusterName, long requestId) {
+    ensureInitialized();
+    LOG.info("TopologyManager.removePendingHostRequests: Entering");
+
+    long clusterId = 0;
+    try {
+      clusterId = ambariContext.getClusterId(clusterName);
+    } catch (AmbariException e) {
+      LOG.error("Unable to retrieve clusterId", e);
+      throw new IllegalArgumentException("Unable to retrieve clusterId");
+    }
+    ClusterTopology topology = clusterTopologyMap.get(clusterId);
+    if (topology == null) {
+      throw new IllegalArgumentException("Unable to retrieve cluster topology for cluster");
+    }
+
+    LogicalRequest logicalRequest = allRequests.get(requestId);
+    if (logicalRequest == null) {
+      throw new IllegalArgumentException("No Logical Request found for requestId: " + requestId);
+    }
+
+    Collection<HostRequest> pendingHostRequests = logicalRequest.removePendingHostRequests(null);
+
+    if (!logicalRequest.hasPendingHostRequests()) {
+      outstandingRequests.remove(logicalRequest);
+    }
+
+    persistedState.removeHostRequests(pendingHostRequests);
+
+    // set current host count to number of currently connected hosts
+    for (HostGroupInfo currentHostGroupInfo : topology.getHostGroupInfo().values()) {
+      currentHostGroupInfo.setRequestedCount(currentHostGroupInfo.getHostNames().size());
+    }
+
+    LOG.info("TopologyManager.removePendingHostRequests: Exit");
+  }
+
   /**
    * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
    * provision cluster request and topology.
@@ -937,7 +972,7 @@ public class TopologyManager {
 
       for (LogicalRequest logicalRequest : requestEntry.getValue()) {
         allRequests.put(logicalRequest.getRequestId(), logicalRequest);
-        if (!logicalRequest.hasCompleted()) {
+        if (logicalRequest.hasPendingHostRequests()) {
           outstandingRequests.add(logicalRequest);
           for (String reservedHost : logicalRequest.getReservedHosts()) {
             reservedHosts.put(reservedHost, logicalRequest);
@@ -968,6 +1003,7 @@ public class TopologyManager {
         }
       }
     }
+    LOG.info("TopologyManager.replayRequests: Exit");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/resources/properties.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/properties.json b/ambari-server/src/main/resources/properties.json
index 9b2bbf8..e536d05 100644
--- a/ambari-server/src/main/resources/properties.json
+++ b/ambari-server/src/main/resources/properties.json
@@ -137,6 +137,8 @@
         "Requests/queued_task_count",
         "Requests/progress_percent",
         "Requests/abort_reason",
+        "Requests/remove_pending_host_requests",
+        "Requests/pending_host_request_count",
         "_"
     ],
     "RequestSchedule" : [

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
index d6e12dc..868dea1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
@@ -187,7 +187,12 @@ public class DatabaseConsistencyCheckHelperTest {
     expect(mockJoinResultSet.getInt(1)).andReturn(resultCount);
     expect(mockStatement.executeQuery("select count(tpr.id) from topology_request tpr")).andReturn(mockCountResultSet);
     expect(mockStatement.executeQuery("select count(DISTINCT tpr.id) from topology_request tpr join " +
-      "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet);
+            "topology_logical_request tlr on tpr.id = tlr.request_id")).andReturn(mockJoinResultSet);
+
+    expect(mockStatement.executeQuery("select count(thr.id) from topology_host_request thr")).andReturn(mockCountResultSet);
+    expect(mockStatement.executeQuery("select count(DISTINCT thr.id) from topology_host_request thr join " +
+            "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
+            "tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet);
 
     DatabaseConsistencyCheckHelper.setInjector(mockInjector);
     DatabaseConsistencyCheckHelper.setConnection(mockConnection);

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
index dc9e5ed..feedc74 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
@@ -78,6 +78,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.topology.Blueprint;
 import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostGroup;
 import org.apache.ambari.server.topology.HostGroupInfo;
 import org.apache.ambari.server.topology.LogicalRequest;
 import org.apache.ambari.server.topology.TopologyManager;
@@ -1649,7 +1650,12 @@ public class RequestResourceProviderTest {
 
 
     ClusterTopology topology = createNiceMock(ClusterTopology.class);
+
+    HostGroup hostGroup = createNiceMock(HostGroup.class);
+    expect(hostGroup.getName()).andReturn("host_group_1").anyTimes();
+
     Blueprint blueprint = createNiceMock(Blueprint.class);
+    expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes();
     expect(topology.getClusterId()).andReturn(2L).anyTimes();
 
     Long clusterId = 2L;
@@ -1666,8 +1672,13 @@ public class RequestResourceProviderTest {
     expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(
       Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
 
+    Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
+    HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1");
+    hostGroupInfo.setRequestedCount(1);
+    hostGroupInfoMap.put("host_group_1", hostGroupInfo);
+
     TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class);
-    expect(topologyRequest.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap()).anyTimes();
+    expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes();
     expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
     expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes();
 
@@ -1677,24 +1688,28 @@ public class RequestResourceProviderTest {
     expect(AmbariServer.getController()).andReturn(managementController).anyTimes();
 
     PowerMock.replayAll(
-      topologyRequest,
-      topology,
-      blueprint,
-      managementController,
-      clusters);
+            topologyRequest,
+            topology,
+            blueprint,
+            managementController,
+            clusters);
 
 
-    LogicalRequest logicalRequest = new LogicalRequest(200L, topologyRequest, topology);
+    LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class);
+    expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes();
+    expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes();
 
     reset(topologyManager);
 
     expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes();
+
+
     expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn(
       Collections.singletonList(logicalRequest)).anyTimes();
     expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn(
       Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
 
-    replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager);
+    replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest);
 
     ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
       type,
@@ -1722,7 +1737,7 @@ public class RequestResourceProviderTest {
 
     // verify
     PowerMock.verifyAll();
-    verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager);
+    verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest);
 
     Assert.assertEquals(1, resources.size());
     for (Resource resource : resources) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
index 42ef020..2019082 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
@@ -461,4 +461,68 @@ public class LogicalRequestTest extends EasyMockSupport {
 
     assertTrue(hostReqHost1.isPresent() && hostReqHost2.isPresent() && hostReqHost3.isPresent() && !hostReqHost4.isPresent());
   }
+
+  @Test
+  public void testRemovePendingHostRequests() throws Exception {
+    // Given
+    Long requestId = 1L;
+
+    final TopologyHostInfoEntity host1 = new TopologyHostInfoEntity();
+    host1.setId(100L);
+    host1.setFqdn("host1");
+
+    final TopologyHostInfoEntity host2 = new TopologyHostInfoEntity();
+    host2.setId(102L);
+    host2.setFqdn("host2");
+
+    TopologyHostGroupEntity hostGroupEntity1 = new TopologyHostGroupEntity();
+    hostGroupEntity1.setTopologyHostInfoEntities(ImmutableSet.of(host1, host2));
+    hostGroupEntity1.setName("host_group_1");
+
+    // host request matched to a registered host
+    TopologyHostRequestEntity hostRequestEntityHost1Matched = new TopologyHostRequestEntity();
+    hostRequestEntityHost1Matched.setId(1L);
+    hostRequestEntityHost1Matched.setHostName(host1.getFqdn()); //host request matched host1
+    hostRequestEntityHost1Matched.setTopologyHostGroupEntity(hostGroupEntity1);
+    hostRequestEntityHost1Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+    expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host1.getFqdn()))).andReturn(true).anyTimes();
+
+
+    // host request that hasn't been matched to any registered host yet
+    TopologyHostRequestEntity hostRequestEntityHost2NotMatched = new TopologyHostRequestEntity();
+    hostRequestEntityHost2NotMatched.setId(2L);
+    hostRequestEntityHost2NotMatched.setTopologyHostGroupEntity(hostGroupEntity1);
+    hostRequestEntityHost2NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+    expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host2.getFqdn()))).andReturn(false).anyTimes();
+
+
+    Collection<TopologyHostRequestEntity> reservedHostRequestEntities = ImmutableSet.of(
+            hostRequestEntityHost1Matched,
+            hostRequestEntityHost2NotMatched);
+
+    hostGroupEntity1.setTopologyHostRequestEntities(reservedHostRequestEntities);
+
+    TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity();
+    topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity1));
+
+
+    expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).atLeastOnce();
+    expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(reservedHostRequestEntities).atLeastOnce();
+    expect(blueprint.getHostGroup(eq("host_group_1"))).andReturn(hostGroup1).atLeastOnce();
+    expect(hostGroup1.containsMasterComponent()).andReturn(false).atLeastOnce();
+
+    replayAll();
+
+    // When
+
+    LogicalRequest req = new LogicalRequest(requestId, replayedTopologyRequest, clusterTopology, logicalRequestEntity);
+    req.removePendingHostRequests(null);
+
+    // Then
+    verifyAll();
+
+    Collection<HostRequest>  hostRequests = req.getHostRequests();
+    assertEquals(1, hostRequests.size());
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 95db56f..0378753 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -395,7 +395,7 @@ public class TopologyManagerTest {
     Map<ClusterTopology, List<LogicalRequest>> allRequests = new HashMap<>();
     List<LogicalRequest> requestList = new ArrayList<>();
     requestList.add(logicalRequest);
-    expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes();
+    expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes();
     allRequests.put(clusterTopologyMock, requestList);
     expect(requestStatusResponse.getTasks()).andReturn(Collections.<ShortTaskStatus>emptyList()).anyTimes();
     expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true);
@@ -404,8 +404,8 @@ public class TopologyManagerTest {
     expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes();
     expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
     expect(ambariContext.isTopologyResolved(CLUSTER_ID)).andReturn(true).anyTimes();
-    expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true);
-    expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true);
+    expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes();
+    expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes();
 
     replayAll();
 
@@ -514,7 +514,8 @@ public class TopologyManagerTest {
     allRequests.put(clusterTopologyMock, logicalRequests);
     expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes();
     expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
-    expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes();
+    expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes();
+    expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes();
     expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
     replayAll();
     EasyMock.replay(clusterTopologyMock);