You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2017/04/28 21:21:39 UTC
ambari git commit: AMBARI-20809. Support hostgroup downscale by
host_count (magyari_sandor)
Repository: ambari
Updated Branches:
refs/heads/trunk 19f9c983e -> 79cca1c71
AMBARI-20809. Support hostgroup downscale by host_count (magyari_sandor)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79cca1c7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79cca1c7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79cca1c7
Branch: refs/heads/trunk
Commit: 79cca1c7184f1661236971dac70d85a83fab6c11
Parents: 19f9c98
Author: Sandor Magyari <sm...@hortonworks.com>
Authored: Fri Apr 28 14:42:55 2017 +0200
Committer: Sandor Magyari <sm...@hortonworks.com>
Committed: Fri Apr 28 23:20:35 2017 +0200
----------------------------------------------------------------------
.../checks/DatabaseConsistencyCheckHelper.java | 70 ++++++++++--------
.../server/controller/RequestRequest.java | 11 +++
.../internal/RequestResourceProvider.java | 77 ++++++++++++++------
.../ambari/server/topology/LogicalRequest.java | 38 +++++++++-
.../ambari/server/topology/PersistedState.java | 7 ++
.../server/topology/PersistedStateImpl.java | 10 +++
.../ambari/server/topology/TopologyManager.java | 56 +++++++++++---
.../src/main/resources/properties.json | 2 +
.../DatabaseConsistencyCheckHelperTest.java | 7 +-
.../internal/RequestResourceProviderTest.java | 33 ++++++---
.../server/topology/LogicalRequestTest.java | 64 ++++++++++++++++
.../server/topology/TopologyManagerTest.java | 9 ++-
12 files changed, 308 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
index b2a03e4..0b7f122 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java
@@ -455,14 +455,14 @@ public class DatabaseConsistencyCheckHelper {
String SELECT_REQUEST_COUNT_QUERY = "select count(tpr.id) from topology_request tpr";
String SELECT_JOINED_COUNT_QUERY = "select count(DISTINCT tpr.id) from topology_request tpr join " +
- "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = " +
- "thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
- "tlt on tht.id = tlt.host_task_id";
+ "topology_logical_request tlr on tpr.id = tlr.request_id";
- int topologyRequestCount = 0;
- int topologyRequestTablesJoinedCount = 0;
+ String SELECT_HOST_REQUEST_COUNT_QUERY = "select count(thr.id) from topology_host_request thr";
+
+ String SELECT_HOST_JOINED_COUNT_QUERY = "select count(DISTINCT thr.id) from topology_host_request thr join " +
+ "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
+ "tlt on tht.id = tlt.host_task_id";
- ResultSet rs = null;
Statement statement = null;
if (connection == null) {
@@ -475,38 +475,25 @@ public class DatabaseConsistencyCheckHelper {
try {
statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
- rs = statement.executeQuery(SELECT_REQUEST_COUNT_QUERY);
- if (rs != null) {
- while (rs.next()) {
- topologyRequestCount = rs.getInt(1);
- }
- }
-
- rs = statement.executeQuery(SELECT_JOINED_COUNT_QUERY);
- if (rs != null) {
- while (rs.next()) {
- topologyRequestTablesJoinedCount = rs.getInt(1);
- }
- }
+ int topologyRequestCount = runQuery(statement, SELECT_REQUEST_COUNT_QUERY);
+ int topologyRequestTablesJoinedCount = runQuery(statement, SELECT_JOINED_COUNT_QUERY);
if (topologyRequestCount != topologyRequestTablesJoinedCount) {
error("Your topology request hierarchy is not complete for each row in topology_request should exist " +
- "at least one raw in topology_logical_request, topology_host_request, topology_host_task, " +
- "topology_logical_task.");
+ "at least one row in topology_logical_request");
}
+ int topologyHostRequestCount = runQuery(statement, SELECT_HOST_REQUEST_COUNT_QUERY);
+ int topologyHostRequestTablesJoinedCount = runQuery(statement, SELECT_HOST_JOINED_COUNT_QUERY);
+
+ if (topologyHostRequestCount != topologyHostRequestTablesJoinedCount) {
+ error("Your topology request hierarchy is not complete for each row in topology_host_request should exist " +
+ "at least one row in topology_host_task, topology_logical_task.");
+ }
} catch (SQLException e) {
LOG.error("Exception occurred during topology request tables check: ", e);
} finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- LOG.error("Exception occurred during result set closing procedure: ", e);
- }
- }
-
if (statement != null) {
try {
statement.close();
@@ -518,6 +505,31 @@ public class DatabaseConsistencyCheckHelper {
}
+ private static int runQuery(Statement statement, String query) {
+ ResultSet rs = null;
+ int result = 0;
+ try {
+ rs = statement.executeQuery(query);
+
+ if (rs != null) {
+ while (rs.next()) {
+ result = rs.getInt(1);
+ }
+ }
+
+ } catch (SQLException e) {
+ LOG.error("Exception occurred during topology request tables check: ", e);
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ LOG.error("Exception occurred during result set closing procedure: ", e);
+ }
+ }
+ }
+ return result;
+ }
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
index db9268b..05c4bad 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/RequestRequest.java
@@ -37,6 +37,8 @@ public class RequestRequest {
private String abortReason;
+ private boolean removePendingHostRequests = false;
+
public HostRoleStatus getStatus() {
return status;
@@ -70,6 +72,14 @@ public class RequestRequest {
this.abortReason = abortReason;
}
+ public boolean isRemovePendingHostRequests() {
+ return removePendingHostRequests;
+ }
+
+ public void setRemovePendingHostRequests(boolean removePendingHostRequests) {
+ this.removePendingHostRequests = removePendingHostRequests;
+ }
+
@Override
public String toString() {
return "RequestRequest{" +
@@ -77,6 +87,7 @@ public class RequestRequest {
", requestId=" + requestId +
", status=" + status +
", abortReason='" + abortReason + '\'' +
+ ", removePendingHostRequests='" + removePendingHostRequests + '\'' +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index 9704b33..57e7024 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -115,6 +115,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
protected static final String REQUEST_COMPLETED_TASK_CNT_ID = "Requests/completed_task_count";
protected static final String REQUEST_QUEUED_TASK_CNT_ID = "Requests/queued_task_count";
protected static final String REQUEST_PROGRESS_PERCENT_ID = "Requests/progress_percent";
+ protected static final String REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID = "Requests/remove_pending_host_requests";
+ protected static final String REQUEST_PENDING_HOST_REQUEST_COUNT_ID = "Requests/pending_host_request_count";
+
protected static final String COMMAND_ID = "command";
protected static final String SERVICE_ID = "service_name";
protected static final String COMPONENT_ID = "component_name";
@@ -152,7 +155,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
REQUEST_TIMED_OUT_TASK_CNT_ID,
REQUEST_COMPLETED_TASK_CNT_ID,
REQUEST_QUEUED_TASK_CNT_ID,
- REQUEST_PROGRESS_PERCENT_ID);
+ REQUEST_PROGRESS_PERCENT_ID,
+ REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID,
+ REQUEST_PENDING_HOST_REQUEST_COUNT_ID);
// ----- Constructors ----------------------------------------------------
@@ -297,6 +302,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
public RequestStatus updateResources(Request requestInfo, Predicate predicate)
throws SystemException, UnsupportedPropertyException,
NoSuchResourceException, NoSuchParentResourceException {
+
AmbariManagementController amc = getManagementController();
final Set<RequestRequest> requests = new HashSet<>();
@@ -319,33 +325,48 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
}
// There should be only one request with this id (or no request at all)
org.apache.ambari.server.actionmanager.Request internalRequest = internalRequests.get(0);
- // Validate update request (check constraints on state value and presence of abort reason)
- if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) {
- throw new IllegalArgumentException("Abort reason can not be empty.");
- }
- if (updateRequest.getStatus() != HostRoleStatus.ABORTED) {
- throw new IllegalArgumentException(
- String.format("%s is wrong value. The only allowed value " +
- "for updating request status is ABORTED",
- updateRequest.getStatus()));
- }
+ if (updateRequest.isRemovePendingHostRequests()) {
+ if (internalRequest instanceof LogicalRequest) {
+ targets.add(internalRequest);
+ } else {
+ throw new IllegalArgumentException("Request with id: " + internalRequest.getRequestId() + "is not a Logical Request.");
+ }
+ } else {
+ // Validate update request (check constraints on state value and presence of abort reason)
+ if (updateRequest.getAbortReason() == null || updateRequest.getAbortReason().isEmpty()) {
+ throw new IllegalArgumentException("Abort reason can not be empty.");
+ }
+
+ if (updateRequest.getStatus() != HostRoleStatus.ABORTED) {
+ throw new IllegalArgumentException(
+ String.format("%s is wrong value. The only allowed value " +
+ "for updating request status is ABORTED",
+ updateRequest.getStatus()));
+ }
- HostRoleStatus internalRequestStatus =
- CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus();
+ HostRoleStatus internalRequestStatus =
+ CalculatedStatus.statusFromStages(internalRequest.getStages()).getStatus();
- if (internalRequestStatus.isCompletedState()) {
- // Ignore updates to completed requests to avoid throwing exception on race condition
- } else {
- // Validation passed
- targets.add(internalRequest);
+ if (internalRequestStatus.isCompletedState()) {
+ // Ignore updates to completed requests to avoid throwing exception on race condition
+ } else {
+ // Validation passed
+ targets.add(internalRequest);
+ }
}
+
}
+
// Perform update
Iterator<RequestRequest> reqIterator = requests.iterator();
for (org.apache.ambari.server.actionmanager.Request target : targets) {
- String reason = reqIterator.next().getAbortReason();
- amc.getActionManager().cancelRequest(target.getRequestId(), reason);
+ if (target instanceof LogicalRequest) {
+ topologyManager.removePendingHostRequests(target.getClusterName(), target.getRequestId());
+ } else {
+ String reason = reqIterator.next().getAbortReason();
+ amc.getActionManager().cancelRequest(target.getRequestId(), reason);
+ }
}
return getRequestStatus(null);
}
@@ -363,9 +384,15 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
requestStatus = HostRoleStatus.valueOf(requestStatusStr);
}
String abortReason = (String) propertyMap.get(REQUEST_ABORT_REASON_PROPERTY_ID);
+ String removePendingHostRequests = (String) propertyMap.get(REQUEST_REMOVE_PENDING_HOST_REQUESTS_ID);
+
RequestRequest requestRequest = new RequestRequest(clusterNameStr, requestId);
requestRequest.setStatus(requestStatus);
requestRequest.setAbortReason(abortReason);
+ if (removePendingHostRequests != null) {
+ requestRequest.setRemovePendingHostRequests(Boolean.valueOf(removePendingHostRequests));
+ }
+
return requestRequest;
}
@@ -753,13 +780,21 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
// in this case, it appears that there are no tasks but this is a logical
// topology request, so it's a matter of hosts simply not registering yet
// for tasks to be created
- status = CalculatedStatus.PENDING;
+ if (logicalRequest.hasPendingHostRequests()) {
+ status = CalculatedStatus.PENDING;
+ } else {
+ status = CalculatedStatus.COMPLETED;
+ }
} else {
// there are either tasks or this is not a logical request, so do normal
// status calculations
status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
}
+ if (null != logicalRequest) {
+ setResourceProperty(resource, REQUEST_PENDING_HOST_REQUEST_COUNT_ID, logicalRequest.getPendingHostRequestCount(), requestedPropertyIds);
+ }
+
setResourceProperty(resource, REQUEST_STATUS_PROPERTY_ID, status.getStatus().toString(), requestedPropertyIds);
setResourceProperty(resource, REQUEST_PROGRESS_PERCENT_ID, status.getPercent(), requestedPropertyIds);
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index a271c0b..b5ee94b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -164,8 +164,8 @@ public class LogicalRequest extends Request {
return requestsWithReservedHosts.keySet();
}
- public boolean hasCompleted() {
- return requestsWithReservedHosts.isEmpty() && outstandingHostRequests.isEmpty();
+ public boolean hasPendingHostRequests() {
+ return !requestsWithReservedHosts.isEmpty() || !outstandingHostRequests.isEmpty();
}
public Collection<HostRequest> getCompletedHostRequests() {
@@ -176,11 +176,45 @@ public class LogicalRequest extends Request {
return completedHostRequests;
}
+ public int getPendingHostRequestCount() {
+ return outstandingHostRequests.size() + requestsWithReservedHosts.size();
+ }
+
//todo: this is only here for toEntity() functionality
public Collection<HostRequest> getHostRequests() {
return new ArrayList<>(allHostRequests);
}
+ /**
+ * Removes pending host requests (outstanding requests not picked up by any host, where hostName is null) for a host group.
+ * @param hostGroupName
+ * @return
+ */
+ public Collection<HostRequest> removePendingHostRequests(String hostGroupName) {
+ Collection<HostRequest> pendingHostRequests = new ArrayList<>();
+ for(HostRequest hostRequest : outstandingHostRequests) {
+ if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) {
+ pendingHostRequests.add(hostRequest);
+ }
+ }
+ outstandingHostRequests.clear();
+
+ Collection<String> pendingReservedHostNames = new ArrayList<>();
+ for(String reservedHostName : requestsWithReservedHosts.keySet()) {
+ HostRequest hostRequest = requestsWithReservedHosts.get(reservedHostName);
+ if(hostGroupName == null || hostRequest.getHostgroupName().equals(hostGroupName)) {
+ pendingHostRequests.add(hostRequest);
+ pendingReservedHostNames.add(reservedHostName);
+ }
+ }
+ for (String hostName : pendingReservedHostNames) {
+ requestsWithReservedHosts.remove(hostName);
+ }
+
+ allHostRequests.removeAll(pendingHostRequests);
+ return pendingHostRequests;
+ }
+
public Map<String, Collection<String>> getProjectedTopology() {
Map<String, Collection<String>> hostComponentMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
index 354764b..f353b8c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.topology;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -78,4 +79,10 @@ public interface PersistedState {
* @return
*/
LogicalRequest getProvisionRequest(long clusterId);
+
+ /**
+ *
+ * @param hostRequests
+ */
+ void removeHostRequests(Collection<HostRequest> hostRequests);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index 36eb1bc..a8b202e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.inject.Inject;
+import com.google.inject.persist.Transactional;
/**
* Implementation which uses Ambari Database DAO and Entity objects for persistence
@@ -119,6 +120,15 @@ public class PersistedStateImpl implements PersistedState {
}
@Override
+ @Transactional
+ public void removeHostRequests(Collection<HostRequest> hostRequests) {
+ for(HostRequest hostRequest : hostRequests) {
+ TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequest.getId());
+ hostRequestDAO.remove(hostRequestEntity);
+ }
+ }
+
+ @Override
public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) {
TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId);
HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 643945c..f5cf498 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -434,7 +434,7 @@ public class TopologyManager {
Map<String, String> requestInfoProps = new HashMap<>();
requestInfoProps.put(org.apache.ambari.server.controller.spi.Request.REQUEST_INFO_BODY_PROPERTY,
- "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}");
+ "{\"" + ArtifactResourceProvider.ARTIFACT_DATA_PROPERTY + "\": " + descriptor + "}");
org.apache.ambari.server.controller.spi.Request request = new RequestImpl(Collections.<String>emptySet(),
Collections.singleton(properties), requestInfoProps, null);
@@ -485,20 +485,55 @@ public class TopologyManager {
final Long requestId = ambariContext.getNextRequestId();
LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
- @Override
- public LogicalRequest call() throws Exception {
- LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
+ @Override
+ public LogicalRequest call() throws Exception {
+ LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
- return logicalRequest;
- }
- }
+ return logicalRequest;
+ }
+ }
);
-
processRequest(request, topology, logicalRequest);
-
return getRequestStatus(logicalRequest.getRequestId());
}
+ public void removePendingHostRequests(String clusterName, long requestId) {
+ ensureInitialized();
+ LOG.info("TopologyManager.removePendingHostRequests: Entering");
+
+ long clusterId = 0;
+ try {
+ clusterId = ambariContext.getClusterId(clusterName);
+ } catch (AmbariException e) {
+ LOG.error("Unable to retrieve clusterId", e);
+ throw new IllegalArgumentException("Unable to retrieve clusterId");
+ }
+ ClusterTopology topology = clusterTopologyMap.get(clusterId);
+ if (topology == null) {
+ throw new IllegalArgumentException("Unable to retrieve cluster topology for cluster");
+ }
+
+ LogicalRequest logicalRequest = allRequests.get(requestId);
+ if (logicalRequest == null) {
+ throw new IllegalArgumentException("No Logical Request found for requestId: " + requestId);
+ }
+
+ Collection<HostRequest> pendingHostRequests = logicalRequest.removePendingHostRequests(null);
+
+ if (!logicalRequest.hasPendingHostRequests()) {
+ outstandingRequests.remove(logicalRequest);
+ }
+
+ persistedState.removeHostRequests(pendingHostRequests);
+
+ // set current host count to number of currently connected hosts
+ for (HostGroupInfo currentHostGroupInfo : topology.getHostGroupInfo().values()) {
+ currentHostGroupInfo.setRequestedCount(currentHostGroupInfo.getHostNames().size());
+ }
+
+ LOG.info("TopologyManager.removePendingHostRequests: Exit");
+ }
+
/**
* Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
* provision cluster request and topology.
@@ -937,7 +972,7 @@ public class TopologyManager {
for (LogicalRequest logicalRequest : requestEntry.getValue()) {
allRequests.put(logicalRequest.getRequestId(), logicalRequest);
- if (!logicalRequest.hasCompleted()) {
+ if (logicalRequest.hasPendingHostRequests()) {
outstandingRequests.add(logicalRequest);
for (String reservedHost : logicalRequest.getReservedHosts()) {
reservedHosts.put(reservedHost, logicalRequest);
@@ -968,6 +1003,7 @@ public class TopologyManager {
}
}
}
+ LOG.info("TopologyManager.replayRequests: Exit");
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/main/resources/properties.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/properties.json b/ambari-server/src/main/resources/properties.json
index 9b2bbf8..e536d05 100644
--- a/ambari-server/src/main/resources/properties.json
+++ b/ambari-server/src/main/resources/properties.json
@@ -137,6 +137,8 @@
"Requests/queued_task_count",
"Requests/progress_percent",
"Requests/abort_reason",
+ "Requests/remove_pending_host_requests",
+ "Requests/pending_host_request_count",
"_"
],
"RequestSchedule" : [
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
index d6e12dc..868dea1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java
@@ -187,7 +187,12 @@ public class DatabaseConsistencyCheckHelperTest {
expect(mockJoinResultSet.getInt(1)).andReturn(resultCount);
expect(mockStatement.executeQuery("select count(tpr.id) from topology_request tpr")).andReturn(mockCountResultSet);
expect(mockStatement.executeQuery("select count(DISTINCT tpr.id) from topology_request tpr join " +
- "topology_logical_request tlr on tpr.id = tlr.request_id join topology_host_request thr on tlr.id = thr.logical_request_id join topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet);
+ "topology_logical_request tlr on tpr.id = tlr.request_id")).andReturn(mockJoinResultSet);
+
+ expect(mockStatement.executeQuery("select count(thr.id) from topology_host_request thr")).andReturn(mockCountResultSet);
+ expect(mockStatement.executeQuery("select count(DISTINCT thr.id) from topology_host_request thr join " +
+ "topology_host_task tht on thr.id = tht.host_request_id join topology_logical_task " +
+ "tlt on tht.id = tlt.host_task_id")).andReturn(mockJoinResultSet);
DatabaseConsistencyCheckHelper.setInjector(mockInjector);
DatabaseConsistencyCheckHelper.setConnection(mockConnection);
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
index dc9e5ed..feedc74 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
@@ -78,6 +78,7 @@ import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.topology.Blueprint;
import org.apache.ambari.server.topology.ClusterTopology;
+import org.apache.ambari.server.topology.HostGroup;
import org.apache.ambari.server.topology.HostGroupInfo;
import org.apache.ambari.server.topology.LogicalRequest;
import org.apache.ambari.server.topology.TopologyManager;
@@ -1649,7 +1650,12 @@ public class RequestResourceProviderTest {
ClusterTopology topology = createNiceMock(ClusterTopology.class);
+
+ HostGroup hostGroup = createNiceMock(HostGroup.class);
+ expect(hostGroup.getName()).andReturn("host_group_1").anyTimes();
+
Blueprint blueprint = createNiceMock(Blueprint.class);
+ expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes();
expect(topology.getClusterId()).andReturn(2L).anyTimes();
Long clusterId = 2L;
@@ -1666,8 +1672,13 @@ public class RequestResourceProviderTest {
expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(
Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
+ Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
+ HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1");
+ hostGroupInfo.setRequestedCount(1);
+ hostGroupInfoMap.put("host_group_1", hostGroupInfo);
+
TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class);
- expect(topologyRequest.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap()).anyTimes();
+ expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes();
expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes();
@@ -1677,24 +1688,28 @@ public class RequestResourceProviderTest {
expect(AmbariServer.getController()).andReturn(managementController).anyTimes();
PowerMock.replayAll(
- topologyRequest,
- topology,
- blueprint,
- managementController,
- clusters);
+ topologyRequest,
+ topology,
+ blueprint,
+ managementController,
+ clusters);
- LogicalRequest logicalRequest = new LogicalRequest(200L, topologyRequest, topology);
+ LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class);
+ expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes();
+ expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes();
reset(topologyManager);
expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes();
+
+
expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn(
Collections.singletonList(logicalRequest)).anyTimes();
expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn(
Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
- replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager);
+ replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest);
ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
type,
@@ -1722,7 +1737,7 @@ public class RequestResourceProviderTest {
// verify
PowerMock.verifyAll();
- verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager);
+ verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest);
Assert.assertEquals(1, resources.size());
for (Resource resource : resources) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
index 42ef020..2019082 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
@@ -461,4 +461,68 @@ public class LogicalRequestTest extends EasyMockSupport {
assertTrue(hostReqHost1.isPresent() && hostReqHost2.isPresent() && hostReqHost3.isPresent() && !hostReqHost4.isPresent());
}
+
+ @Test
+ public void testRemovePendingHostRequests() throws Exception {
+ // Given
+ Long requestId = 1L;
+
+ final TopologyHostInfoEntity host1 = new TopologyHostInfoEntity();
+ host1.setId(100L);
+ host1.setFqdn("host1");
+
+ final TopologyHostInfoEntity host2 = new TopologyHostInfoEntity();
+ host2.setId(102L);
+ host2.setFqdn("host2");
+
+ TopologyHostGroupEntity hostGroupEntity1 = new TopologyHostGroupEntity();
+ hostGroupEntity1.setTopologyHostInfoEntities(ImmutableSet.of(host1, host2));
+ hostGroupEntity1.setName("host_group_1");
+
+ // host request matched to a registered host
+ TopologyHostRequestEntity hostRequestEntityHost1Matched = new TopologyHostRequestEntity();
+ hostRequestEntityHost1Matched.setId(1L);
+ hostRequestEntityHost1Matched.setHostName(host1.getFqdn()); //host request matched host1
+ hostRequestEntityHost1Matched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost1Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host1.getFqdn()))).andReturn(true).anyTimes();
+
+
+ // host request that hasn't been matched to any registered host yet
+ TopologyHostRequestEntity hostRequestEntityHost2NotMatched = new TopologyHostRequestEntity();
+ hostRequestEntityHost2NotMatched.setId(2L);
+ hostRequestEntityHost2NotMatched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost2NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host2.getFqdn()))).andReturn(false).anyTimes();
+
+
+ Collection<TopologyHostRequestEntity> reservedHostRequestEntities = ImmutableSet.of(
+ hostRequestEntityHost1Matched,
+ hostRequestEntityHost2NotMatched);
+
+ hostGroupEntity1.setTopologyHostRequestEntities(reservedHostRequestEntities);
+
+ TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity();
+ topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity1));
+
+
+ expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).atLeastOnce();
+ expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(reservedHostRequestEntities).atLeastOnce();
+ expect(blueprint.getHostGroup(eq("host_group_1"))).andReturn(hostGroup1).atLeastOnce();
+ expect(hostGroup1.containsMasterComponent()).andReturn(false).atLeastOnce();
+
+ replayAll();
+
+ // When
+
+ LogicalRequest req = new LogicalRequest(requestId, replayedTopologyRequest, clusterTopology, logicalRequestEntity);
+ req.removePendingHostRequests(null);
+
+ // Then
+ verifyAll();
+
+ Collection<HostRequest> hostRequests = req.getHostRequests();
+ assertEquals(1, hostRequests.size());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/79cca1c7/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 95db56f..0378753 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -395,7 +395,7 @@ public class TopologyManagerTest {
Map<ClusterTopology, List<LogicalRequest>> allRequests = new HashMap<>();
List<LogicalRequest> requestList = new ArrayList<>();
requestList.add(logicalRequest);
- expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes();
+ expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes();
allRequests.put(clusterTopologyMock, requestList);
expect(requestStatusResponse.getTasks()).andReturn(Collections.<ShortTaskStatus>emptyList()).anyTimes();
expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true);
@@ -404,8 +404,8 @@ public class TopologyManagerTest {
expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes();
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
expect(ambariContext.isTopologyResolved(CLUSTER_ID)).andReturn(true).anyTimes();
- expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true);
- expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true);
+ expect(group1.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes();
+ expect(group2.addComponent("KERBEROS_CLIENT")).andReturn(true).anyTimes();
replayAll();
@@ -514,7 +514,8 @@ public class TopologyManagerTest {
allRequests.put(clusterTopologyMock, logicalRequests);
expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes();
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
- expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes();
+ expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes();
+ expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
replayAll();
EasyMock.replay(clusterTopologyMock);