You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/11/12 19:15:21 UTC
[06/12] incubator-slider git commit: SLIDER-967 Use nodemap to build
up location restrictions on AA placement. This is the core of the AA
placement algorithm: it finds nodes that are free to use.
SLIDER-967 Use nodemap to build up location restrictions on AA placement.
This is the core of the AA placement algorithm: it finds nodes that are free to use.
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/7899f59a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/7899f59a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/7899f59a
Branch: refs/heads/feature/SLIDER-82-pass-3.1
Commit: 7899f59a1cf12ae88775dcdd85a712f96cd6eb7c
Parents: 856ab84
Author: Steve Loughran <st...@apache.org>
Authored: Wed Nov 11 20:11:50 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Nov 11 20:11:50 2015 +0000
----------------------------------------------------------------------
.../slider/api/proto/RestTypeMarshalling.java | 10 +-
.../slider/api/types/NodeInformation.java | 4 +-
.../server/appmaster/SliderAppMaster.java | 6 +-
.../server/appmaster/rpc/SliderIPCService.java | 1 -
.../slider/server/appmaster/state/AppState.java | 91 ++++++++-------
.../server/appmaster/state/NodeEntry.java | 2 +-
.../server/appmaster/state/NodeInstance.java | 11 +-
.../slider/server/appmaster/state/NodeMap.java | 4 +-
.../appmaster/state/OutstandingRequest.java | 51 ++++++++-
.../state/OutstandingRequestTracker.java | 55 +++++++++
.../server/appmaster/state/RoleHistory.java | 112 +++++++++++++++++--
.../server/appmaster/state/RoleStatus.java | 26 ++++-
.../appstate/TestMockAppStateAAPlacement.groovy | 31 ++++-
.../model/history/TestRoleHistoryAA.groovy | 25 +++--
...stRoleHistoryFindNodesForNewInstances.groovy | 20 ++--
...tRoleHistoryOutstandingRequestTracker.groovy | 30 ++++-
.../TestRoleHistoryRequestTracking.groovy | 12 +-
.../model/mock/BaseMockAppStateTest.groovy | 4 +
18 files changed, 397 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java b/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java
index b7985e6..85a8358 100644
--- a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java
+++ b/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java
@@ -38,6 +38,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
/**
@@ -160,8 +162,8 @@ public class RestTypeMarshalling {
builder.setLabels(info.labels);
}
- List<NodeEntryInformation> entries = info.entries;
- if (entries != null) {
+ if (info.entries != null) {
+ Collection<NodeEntryInformation> entries = info.entries.values();
for (NodeEntryInformation entry : entries) {
Messages.NodeEntryInformationProto.Builder node =
Messages.NodeEntryInformationProto.newBuilder();
@@ -192,7 +194,7 @@ public class RestTypeMarshalling {
info.state = wire.getState();
List<Messages.NodeEntryInformationProto> entriesList = wire.getEntriesList();
if (entriesList != null) {
- info.entries = new ArrayList<>(entriesList.size());
+ info.entries = new HashMap<>(entriesList.size());
for (Messages.NodeEntryInformationProto entry : entriesList) {
NodeEntryInformation nei = new NodeEntryInformation();
nei.failed = entry.getFailed();
@@ -205,7 +207,7 @@ public class RestTypeMarshalling {
nei.releasing = entry.getReleasing();
nei.startFailed = entry.getStartFailed();
nei.starting = entry.getStarting();
- info.entries.add(nei);
+ info.entries.put(Integer.toString(nei.priority), nei);
}
}
return info;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java b/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java
index 049ee52..edf7e21 100644
--- a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java
+++ b/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java
@@ -22,7 +22,9 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Serialized node information. Must be kept in sync with the protobuf equivalent.
@@ -38,5 +40,5 @@ public class NodeInformation {
public long lastUpdated;
public String rackName;
public String state;
- public List<NodeEntryInformation> entries = new ArrayList<>();
+ public Map<String, NodeEntryInformation> entries = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index d74688b..171451e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1826,7 +1826,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*/
private void releaseAllContainers() {
List<AbstractRMOperation> operations = appState.releaseAllContainers();
- providerRMOperationHandler.execute(operations);
//now apply the operations
execute(operations);
}
@@ -1852,7 +1851,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
log.info("Updated nodes {}", updatedNodes);
// Check if any nodes are lost or revived and update state accordingly
- appState.onNodesUpdated(updatedNodes);
+ List<AbstractRMOperation> operations = appState.onNodesUpdated(updatedNodes);
+ execute(operations);
+ // if there were any operations, trigger a review
+ reviewRequestAndReleaseNodes("nodes updated");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
index bb8f512..a983f53 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
@@ -448,7 +448,6 @@ public class SliderIPCService extends AbstractService
}
}
-
@Override
public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException {
return lookupAggregateConf(MODEL_DESIRED);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 063a7fc..c960510 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -663,7 +663,6 @@ public class AppState {
return newRole;
}
-
/**
* Actions to perform when an instance definition is updated
* Currently:
@@ -678,9 +677,9 @@ public class AppState {
*
* @throws BadConfigException
*/
- private synchronized void onInstanceDefinitionUpdated() throws
- BadConfigException,
- IOException {
+ private synchronized void onInstanceDefinitionUpdated()
+ throws BadConfigException, IOException {
+
log.debug("Instance definition updated");
//note the time
snapshotTime = now();
@@ -1220,11 +1219,14 @@ public class AppState {
* Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here.
* This is where role history information will be used for placement decisions.
* @param role role
- * @return the container request to submit
+ * @return the container request to submit or null if there is none
*/
private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
incrementRequestCount(role);
OutstandingRequest request = roleHistory.requestContainerForRole(role);
+ if (request == null) {
+ return null;
+ }
if (role.isAntiAffinePlacement()) {
role.setOutstandingAArequest(request);
}
@@ -1426,12 +1428,13 @@ public class AppState {
* Handle node update from the RM. This syncs up the node map with the RM's view
* @param updatedNodes updated nodes
*/
- public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) {
+ public synchronized List<AbstractRMOperation> onNodesUpdated(List<NodeReport> updatedNodes) {
boolean changed = roleHistory.onNodesUpdated(updatedNodes);
if (changed) {
- //TODO
log.error("TODO: cancel AA requests and re-review");
+ return cancelOutstandingAARequests();
}
+ return new ArrayList<>(0);
}
/**
@@ -1882,6 +1885,20 @@ public class AppState {
}
/**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+ List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests();
+ for (RoleStatus roleStatus : roleStatusMap.values()) {
+ if (roleStatus.isAntiAffinePlacement()) {
+ roleStatus.cancelOutstandingAARequest();
+ }
+ }
+ return operations;
+ }
+
+ /**
* Look at the allocation status of one role, and trigger add/release
* actions if the number of desired role instances doesn't equal
* (actual + pending).
@@ -1900,7 +1917,6 @@ public class AppState {
long delta;
long expected;
String name = role.getName();
- boolean isAA = role.isAntiAffinePlacement();
synchronized (role) {
delta = role.getDelta();
expected = role.getDesired();
@@ -1920,19 +1936,24 @@ public class AppState {
if (delta > 0) {
// more workers needed than we have -ask for more
- log.info("{}: Asking for {} more nodes(s) for a total of {} ", name,
- delta, expected);
-
- // TODO: AA RH to help here by only allowing one request for an AA
+ log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected);
- if (isAA) {
- // build one only if there is none outstanding
+ if (role.isAntiAffinePlacement()) {
+ // build one only if there is none outstanding, the role history knows
+ // enough about the cluster to ask, and there is somewhere to place
+ // the node
if (role.getPendingAntiAffineRequests() == 0
+ && !role.isAARequestOutstanding()
&& roleHistory.canPlaceAANodes()) {
- log.info("Starting an anti-affine request sequence for {} nodes", delta);
// log the number outstanding
- role.incPendingAntiAffineRequests(delta - 1);
- addContainerRequest(operations, createContainerRequest(role));
+ AMRMClient.ContainerRequest request = createContainerRequest(role);
+ if (request != null) {
+ log.info("Starting an anti-affine request sequence for {} nodes", delta);
+ role.incPendingAntiAffineRequests(delta - 1);
+ addContainerRequest(operations, request);
+ } else {
+ log.info("No location for anti-affine request");
+ }
} else {
if (roleHistory.canPlaceAANodes()) {
log.info("Adding {} more anti-affine requests", delta);
@@ -1955,22 +1976,7 @@ public class AppState {
// reduce the number expected (i.e. subtract the delta)
long excess = -delta;
- if (isAA) {
- // there may be pending requests which can be cancelled here
- long pending = role.getPendingAntiAffineRequests();
- if (excess <= pending) {
- long outstanding = pending - excess;
- log.info("Cancelling {} pending AA allocations, leaving {}", excess, outstanding);
- role.setPendingAntiAffineRequests(outstanding);
- excess = 0;
- } else {
- // not enough
- log.info("Cancelling all pending AA allocations");
- role.setPendingAntiAffineRequests(0);
- excess -= pending;
- }
- }
- // how many requests are outstanding?
+ // how many requests are outstanding? for AA roles, this includes pending
long outstandingRequests = role.getRequested();
if (outstandingRequests > 0) {
// outstanding requests.
@@ -2052,15 +2058,22 @@ public class AppState {
return operations;
}
+ /**
+ * Add a container request if the request is non-null
+ * @param operations operations to add the entry to
+ * @param containerAsk what to ask for
+ */
private void addContainerRequest(List<AbstractRMOperation> operations,
AMRMClient.ContainerRequest containerAsk) {
- log.info("Container ask is {} and label = {}", containerAsk,
- containerAsk.getNodeLabelExpression());
- int askMemory = containerAsk.getCapability().getMemory();
- if (askMemory > this.containerMaxMemory) {
- log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
+ if (containerAsk != null) {
+ log.info("Container ask is {} and label = {}", containerAsk,
+ containerAsk.getNodeLabelExpression());
+ int askMemory = containerAsk.getCapability().getMemory();
+ if (askMemory > this.containerMaxMemory) {
+ log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
+ }
+ operations.add(new ContainerRequestOperation(containerAsk));
}
- operations.add(new ContainerRequestOperation(containerAsk));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
index 6dae3c6..c180f88 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java
@@ -90,7 +90,7 @@ public class NodeEntry {
* the number of instances > 1.
*/
public synchronized boolean isAvailable() {
- return getActive() == 0 && (requested == 0) && starting == 0;
+ return getActive() == 0 && requested == 0 && starting == 0;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
index b805ffb..ebd9d5a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
@@ -22,10 +22,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.slider.api.types.NodeInformation;
import org.apache.slider.common.tools.Comparators;
+import org.apache.slider.common.tools.SliderUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
@@ -64,7 +66,8 @@ public class NodeInstance {
private String nodeLabels = "";
/**
- * The list of node entries of specific roles
+ * An unordered list of node entries of specific roles. There's nothing
+ * indexed so as to support sparser datastructures.
*/
private final List<NodeEntry> nodeEntries;
@@ -307,9 +310,9 @@ public class NodeInstance {
info.rackName = nodeReport.getRackName();
info.healthReport = nodeReport.getHealthReport();
}
- info.entries = new ArrayList<>(nodeEntries.size());
+ info.entries = new HashMap<>(nodeEntries.size());
for (NodeEntry nodeEntry : nodeEntries) {
- info.entries.add(nodeEntry.serialize());
+ info.entries.put(Integer.toString(nodeEntry.rolePriority), nodeEntry.serialize());
}
return info;
}
@@ -323,7 +326,7 @@ public class NodeInstance {
*/
public boolean canHost(int role, String label) {
return isOnline()
- && (label.isEmpty() || label.equals(nodeLabels)) // label match
+ && (SliderUtils.isUnset(label) || label.equals(nodeLabels)) // label match
&& (get(role) == null || get(role).isAvailable()); // no live role
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
index 2887c9e..aea48b3 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
@@ -146,9 +146,9 @@ public class NodeMap extends HashMap<String, NodeInstance> {
* Scan the current node map for all nodes capable of hosting an instance
* @param role role ID
* @param label label which must match, or "" for no label checks
- * @return a list of node instances matching the criteria.
+ * @return a possibly empty list of node instances matching the criteria.
*/
- public List<NodeInstance> findNodesForRole(int role, String label) {
+ public List<NodeInstance> findAllNodesForRole(int role, String label) {
List<NodeInstance> nodes = new ArrayList<>(size());
for (NodeInstance instance : values()) {
if (instance.canHost(role, label)) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index a9d4b52..e211e7f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -29,6 +29,7 @@ import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -53,6 +54,13 @@ public final class OutstandingRequest extends RoleHostnamePair {
public final NodeInstance node;
/**
+ * A list of all possible nodes to list in an AA request. For a non-AA
+ * request where {@link #node} is set, element 0 of the list is the same
+ * value.
+ */
+ public final List<NodeInstance> nodes = new ArrayList<>(1);
+
+ /**
* Optional label. This is cached as the request option (explicit-location + label) is forbidden,
* yet the label needs to be retained for escalation.
*/
@@ -95,6 +103,12 @@ public final class OutstandingRequest extends RoleHostnamePair {
private int priority = -1;
/**
+ * Is this an Anti-affine request which should be cancelled on
+ * a cluster resize?
+ */
+ private boolean antiAffine = false;
+
+ /**
* Create a request
* @param roleId role
* @param node node -can be null
@@ -103,6 +117,7 @@ public final class OutstandingRequest extends RoleHostnamePair {
NodeInstance node) {
super(roleId, node != null ? node.hostname : null);
this.node = node;
+ nodes.add(node);
}
/**
@@ -119,6 +134,19 @@ public final class OutstandingRequest extends RoleHostnamePair {
}
/**
+ * Create an Anti-affine reques, including all listed nodes (there must be one)
+ * as targets.
+ * @param roleId role
+ * @param nodes list of nodes
+ */
+ public OutstandingRequest(int roleId, List<NodeInstance> nodes) {
+ super(roleId, nodes.get(0).hostname);
+ this.node = null;
+ this.antiAffine = true;
+ this.nodes.addAll(nodes);
+ }
+
+ /**
* Is the request located in the cluster, that is: does it have a node.
* @return true if a node instance was supplied in the constructor
*/
@@ -150,6 +178,14 @@ public final class OutstandingRequest extends RoleHostnamePair {
return priority;
}
+ public boolean isAntiAffine() {
+ return antiAffine;
+ }
+
+ public void setAntiAffine(boolean antiAffine) {
+ this.antiAffine = antiAffine;
+ }
+
/**
* Build a container request.
* <p>
@@ -183,7 +219,19 @@ public final class OutstandingRequest extends RoleHostnamePair {
NodeInstance target = this.node;
String nodeLabels;
- if (target != null) {
+ if (isAntiAffine()) {
+ hosts = new String[nodes.size()];
+ int c = 0;
+ for (NodeInstance nodeInstance : nodes) {
+ hosts[c] = nodeInstance.hostname;
+ c++;
+ }
+ log.info("Creating anti-affine request across {} nodes; first node = {}", c, hostname);
+ escalated = false;
+ mayEscalate = false;
+ relaxLocality = false;
+ nodeLabels = label;
+ } else if (target != null) {
// placed request. Hostname is used in request
hosts = new String[1];
hosts[0] = target.hostname;
@@ -215,7 +263,6 @@ public final class OutstandingRequest extends RoleHostnamePair {
pri,
relaxLocality,
nodeLabels);
-
validate();
return issuedRequest;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index ecdc07a..4209449 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -93,6 +93,23 @@ public class OutstandingRequestTracker {
}
/**
+ * Create a new Anti-affine request for the specific role
+ * <p>
+ * It is added to {@link #openRequests}
+ * <p>
+ * This does not update the node instance's role's request count
+ * @param role role index
+ * @param nodes list of suitable nodes
+ * @return a new request
+ */
+ public synchronized OutstandingRequest newAARequest(int role, List<NodeInstance> nodes) {
+ Preconditions.checkArgument(!nodes.isEmpty());
+ OutstandingRequest request = new OutstandingRequest(role, nodes);
+ openRequests.add(request);
+ return request;
+ }
+
+ /**
* Look up any oustanding request to a (role, hostname).
* @param role role index
* @param hostname hostname
@@ -364,6 +381,43 @@ public class OutstandingRequestTracker {
}
/**
+ * Cancel all outstanding AA requests from the lists of requests.
+ *
+ * This does not remove them from the role status; they must be reset
+ * by the caller.
+ *
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+
+ List<AbstractRMOperation> operations = new ArrayList<>();
+
+ // first, all placed requests
+ for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) {
+ OutstandingRequest outstandingRequest = entry.getValue();
+ synchronized (outstandingRequest) {
+ if (outstandingRequest.isAntiAffine()) {
+ // time to escalate
+ operations.add(outstandingRequest.createCancelOperation());
+ placedRequests.remove(entry.getKey());
+ }
+ }
+ }
+ // second, all open requests
+ for (OutstandingRequest outstandingRequest : openRequests) {
+ synchronized (outstandingRequest) {
+ if (outstandingRequest.isAntiAffine()) {
+ // time to escalate
+ operations.add(outstandingRequest.createCancelOperation());
+ openRequests.remove(outstandingRequest);
+ }
+ }
+ }
+
+ return operations;
+ }
+
+ /**
* Extract a specific number of open requests for a role
* @param roleId role Id
* @param count count to extract
@@ -382,6 +436,7 @@ public class OutstandingRequestTracker {
}
return results;
}
+
/**
* Extract a specific number of placed requests for a role
* @param roleId role Id
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index df1f4e1..2ca5367 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -19,13 +19,13 @@
package org.apache.slider.server.appmaster.state;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.slider.api.types.NodeInformation;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.exceptions.BadConfigException;
@@ -46,11 +46,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -549,7 +547,7 @@ public class RoleHistory {
* @return the instance, or null for none
*/
@VisibleForTesting
- public synchronized NodeInstance findNodeForNewInstance(RoleStatus role) {
+ public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) {
if (!role.isPlacementDesired()) {
// no data locality policy
return null;
@@ -591,6 +589,18 @@ public class RoleHistory {
}
/**
+ * Find a node for use
+ * @param role role
+ * @return the instance, or null for none
+ */
+ @VisibleForTesting
+ public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) {
+ // all nodes that are live and can host the role; no attempt to exclude ones
+ // considered failing
+ return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression());
+ }
+
+ /**
* Request an instance on a given node.
* An outstanding request is created & tracked, with the
* relevant node entry for that role updated.
@@ -615,15 +625,29 @@ public class RoleHistory {
* Find a node for a role and request an instance on that (or a location-less
* instance)
* @param role role status
- * @return a request ready to go
+ * @return a request ready to go, or null if this is an AA request and no
+ * location can be found.
*/
public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) {
Resource resource = recordFactory.newResource();
role.copyResourceRequirements(resource);
- NodeInstance node = findNodeForNewInstance(role);
- // TODO AA -what if there are no suitable nodes?
- return requestInstanceOnNode(node, role, resource);
+ if (role.isAntiAffinePlacement()) {
+ // if a placement can be found, return it.
+ List<NodeInstance> nodes = findNodeForNewAAInstance(role);
+ if (!nodes.isEmpty()) {
+ OutstandingRequest outstanding
+ = outstandingRequests.newAARequest(role.getKey(), nodes);
+ outstanding.buildContainerRequest(resource, role, now());
+ return outstanding;
+ } else {
+ log.warn("No suitable location for {}", role.getName());
+ return null;
+ }
+ } else {
+ NodeInstance node = findRecentNodeForNewInstance(role);
+ return requestInstanceOnNode(node, role, resource);
+ }
}
/**
@@ -972,6 +996,26 @@ public class RoleHistory {
public List<AbstractRMOperation> escalateOutstandingRequests() {
return outstandingRequests.escalateOutstandingRequests(now());
}
+ /**
+ * Escalate operation as triggered by external timer.
+ * @return a (usually empty) list of cancel/request operations.
+ */
+ public List<AbstractRMOperation> cancelOutstandingAARequests() {
+ return outstandingRequests.cancelOutstandingAARequests();
+ }
+
+ /**
+ * Cancel a number of outstanding requests for a role -that is, not
+ * actual containers, just requests for new ones.
+ * @param role role
+ * @param toCancel number to cancel
+ * @return a list of cancellable operations.
+ */
+ public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
+ return role.isAntiAffinePlacement() ?
+ cancelRequestsForAARole(role, toCancel)
+ : cancelRequestsForSimpleRole(role, toCancel);
+ }
/**
* Build the list of requests to cancel from the outstanding list.
@@ -979,19 +1023,67 @@ public class RoleHistory {
* @param toCancel number to cancel
* @return a list of cancellable operations.
*/
- public synchronized List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
+ private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) {
+ Preconditions.checkArgument(toCancel > 0,
+ "trying to cancel invalid number of requests: " + toCancel);
List<AbstractRMOperation> results = new ArrayList<>(toCancel);
// first scan through the unplaced request list to find all of a role
int roleId = role.getKey();
List<OutstandingRequest> requests =
outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+ if (role.isAntiAffinePlacement()) {
+ // TODO: AA
+ // AA placement, so clear the role info
+ role.cancelOutstandingAARequest();
+ }
// are there any left?
int remaining = toCancel - requests.size();
// ask for some placed nodes
requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining));
- // TODO AA: clear anything here?
+ // build cancellations
+ for (OutstandingRequest request : requests) {
+ results.add(request.createCancelOperation());
+ }
+ return results;
+ }
+
+ /**
+ * Build the list of requests to cancel for an AA role. This reduces the number
+ * of outstanding pending requests first, then cancels any active request,
+ * before finally asking for any placed containers
+ * @param role role
+ * @param toCancel number to cancel
+ * @return a list of cancellable operations.
+ */
+ private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) {
+ List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+ int roleId = role.getKey();
+ List<OutstandingRequest> requests = new ArrayList<>(toCancel);
+ // there may be pending requests which can be cancelled here
+ long pending = role.getPendingAntiAffineRequests();
+ if (pending > 0) {
+ // there are some pending ones which can be cancelled first
+ long pendingToCancel = Math.min(pending, toCancel);
+ log.info("Cancelling {} pending AA allocations, leaving {}", toCancel,
+ pendingToCancel);
+ role.setPendingAntiAffineRequests(pending - pendingToCancel);
+ toCancel -= pendingToCancel;
+ }
+ if (toCancel > 0 && role.isAARequestOutstanding()) {
+ // not enough
+ log.info("Cancelling current AA request");
+ // find the single entry which may be running
+ requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+ role.cancelOutstandingAARequest();
+ toCancel--;
+ }
+
+ // ask for some excess nodes
+ if (toCancel > 0) {
+ requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel));
+ }
// build cancellations
for (OutstandingRequest request : requests) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
index 1beaddc..a14a84b 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -71,7 +71,7 @@ public final class RoleStatus implements Cloneable {
private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
/** any pending AA request */
- private OutstandingRequest outstandingAArequest = null;
+ private volatile OutstandingRequest outstandingAArequest = null;
private String failureMessage = "";
@@ -155,8 +155,12 @@ public final class RoleStatus implements Cloneable {
return actual.decToFloor(1);
}
+ /**
+ * Get the request count. For AA roles, this includes pending ones.
+ * @return a count of requested containers
+ */
public long getRequested() {
- return requested.get();
+ return requested.get() + pendingAntiAffineRequests.get();
}
public long incRequested() {
@@ -209,6 +213,14 @@ public final class RoleStatus implements Cloneable {
}
/**
+ * Probe for an outstanding AA request being true
+ * @return true if there is an outstanding AA Request
+ */
+ public boolean isAARequestOutstanding() {
+ return outstandingAArequest != null;
+ }
+
+ /**
* Note that a role failed, text will
* be used in any diagnostics if an exception
* is later raised.
@@ -312,13 +324,21 @@ public final class RoleStatus implements Cloneable {
/**
* Complete the outstanding AA request (there's no check for one in progress, caller
* expected to have done that).
- * @return the number of outstanding requests
*/
public void completeOutstandingAARequest() {
setOutstandingAArequest(null);
}
/**
+ * Cancel any outstanding AA request. Harmless if the role is non-AA, or
+ * if there are no outstanding requests.
+ */
+ public void cancelOutstandingAARequest() {
+ setOutstandingAArequest(null);
+ setPendingAntiAffineRequests(0);
+ }
+
+ /**
* Get the number of roles we are short of.
* nodes released are ignored.
* @return the positive or negative number of roles to add/release.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
index baf88dc..c7f59e3 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
@@ -93,8 +93,8 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
AMRMClient.ContainerRequest request = getSingleRequest(ops)
- assert request.relaxLocality
- assert request.nodes == null
+ assert !request.relaxLocality
+ assert request.nodes.size() == engine.cluster.clusterSize
assert request.racks == null
assert request.capability
@@ -131,6 +131,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
assert appState.onNodeManagerContainerStarted(container.id)
ops = appState.reviewRequestAndReleaseNodes()
assert ops.size() == 0
+ assertAllContainersAA();
}
@Test
@@ -160,6 +161,8 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
assert 1 == submitOperations(ops2, [], ops3).size()
assert 2 == ops3.size()
assert aaRole.pendingAntiAffineRequests == 0
+ assertAllContainersAA()
+
}
@Test
@@ -169,14 +172,17 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
getSingleRequest(ops)
assert aaRole.pendingAntiAffineRequests == 1
+ assert aaRole.AARequestOutstanding
// flex down so that the next request should be cancelled
aaRole.desired = 1
- // expect: no new reqests, pending count --
+ // expect: no new requests, pending count --
List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes()
assert ops2.empty
+ assert aaRole.AARequestOutstanding
assert aaRole.pendingAntiAffineRequests == 0
+ assertAllContainersAA()
// next iter
submitOperations(ops, [], ops2).size()
@@ -195,12 +201,14 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
getSingleRequest(ops)
assert aaRole.pendingAntiAffineRequests == 0
+ assert aaRole.AARequestOutstanding
// flex down so that the next request should be cancelled
aaRole.desired = 0
// expect: no new reqests, pending count --
List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes()
assert aaRole.pendingAntiAffineRequests == 0
+ assert !aaRole.AARequestOutstanding
assert ops2.size() == 1
getSingleCancel(ops2)
@@ -209,5 +217,22 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
assert 1 == ops2.size()
}
+ void assertAllContainersAA() {
+ assertAllContainersAA(Integer.toString(aaRole.key))
+ }
+
+ /**
+ * Scan through all containers and assert that the assignment is AA
+ * @param index role index
+ */
+ void assertAllContainersAA(String index) {
+ def nodemap = stateAccess.nodeInformationSnapshot
+ nodemap.each { name, info ->
+ def nodeEntry = info.entries[index]
+ assert nodeEntry == null ||
+ (nodeEntry.live + nodeEntry.starting + nodeEntry.releasing) <= 1 ,
+ "too many instances on node $name"
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
index f99326f..fdbc3b4 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
@@ -21,10 +21,15 @@ package org.apache.slider.server.appmaster.model.history
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.NodeReport
import org.apache.hadoop.yarn.api.records.NodeState
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.model.mock.MockNodeReport
+import org.apache.slider.server.appmaster.model.mock.MockRoleHistory
import org.apache.slider.server.appmaster.state.NodeEntry
import org.apache.slider.server.appmaster.state.NodeInstance
import org.apache.slider.server.appmaster.state.NodeMap
+import org.apache.slider.server.appmaster.state.RoleHistory
+import org.apache.slider.server.appmaster.state.RoleStatus
import org.apache.slider.test.SliderTestBase
import org.junit.Test
@@ -37,44 +42,48 @@ class TestRoleHistoryAA extends SliderTestBase {
List<String> hostnames = ["one", "two", "three"]
NodeMap nodeMap, gpuNodeMap
+ RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES)
+
+ AMRMClient.ContainerRequest requestContainer(RoleStatus roleStatus) {
+ roleHistory.requestContainerForRole(roleStatus).issuedRequest
+ }
@Override
void setup() {
super.setup()
nodeMap = createNodeMap(hostnames, NodeState.RUNNING)
gpuNodeMap = createNodeMap(hostnames, NodeState.RUNNING, "GPU")
-
}
@Test
public void testFindNodesInFullCluster() throws Throwable {
// all three will surface at first
- assertResultSize(3, nodeMap.findNodesForRole(1, ""))
+ assertResultSize(3, nodeMap.findAllNodesForRole(1, ""))
}
@Test
public void testFindNodesInUnhealthyCluster() throws Throwable {
// all three will surface at first
nodeMap.get("one").updateNode(new MockNodeReport("one",NodeState.UNHEALTHY))
- assertResultSize(2, nodeMap.findNodesForRole(1, ""))
+ assertResultSize(2, nodeMap.findAllNodesForRole(1, ""))
}
@Test
public void testFindNoNodesWrongLabel() throws Throwable {
// all three will surface at first
- assertResultSize(0, nodeMap.findNodesForRole(1, "GPU"))
+ assertResultSize(0, nodeMap.findAllNodesForRole(1, "GPU"))
}
@Test
public void testFindNoNodesRightLabel() throws Throwable {
// all three will surface at first
- assertResultSize(3, gpuNodeMap.findNodesForRole(1, "GPU"))
+ assertResultSize(3, gpuNodeMap.findAllNodesForRole(1, "GPU"))
}
@Test
public void testFindNoNodesNoLabel() throws Throwable {
// all three will surface at first
- assertResultSize(3, gpuNodeMap.findNodesForRole(1, ""))
+ assertResultSize(3, gpuNodeMap.findAllNodesForRole(1, ""))
}
@Test
@@ -83,7 +92,7 @@ class TestRoleHistoryAA extends SliderTestBase {
applyToNodeEntries(nodeMap) {
NodeEntry it -> it.request()
}
- assertResultSize(0, nodeMap.findNodesForRole(1, ""))
+ assertResultSize(0, nodeMap.findAllNodesForRole(1, ""))
}
@Test
@@ -92,7 +101,7 @@ class TestRoleHistoryAA extends SliderTestBase {
applyToNodeEntries(nodeMap) {
NodeEntry it -> it.request()
}
- assertResultSize(0, nodeMap.findNodesForRole(1, ""))
+ assertResultSize(0, nodeMap.findAllNodesForRole(1, ""))
}
def assertResultSize(int size, List<NodeInstance> list) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy
index 63aa6d2..f36724e 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy
@@ -33,10 +33,8 @@ import org.junit.Test
/**
* Testing finding nodes for new instances.
- * These tests validate the (currently) suboptimal
- * behavior of not listing any known nodes when there
- * are none in the available list -even if there are nodes
- * known to be not running live instances in the cluster.
+ *
+ * This stresses the non-AA codepath
*/
@Slf4j
@CompileStatic
@@ -71,7 +69,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest {
public List<NodeInstance> findNodes(int count, RoleStatus roleStatus = roleStat) {
List < NodeInstance > found = [];
for (int i = 0; i < count; i++) {
- NodeInstance f = roleHistory.findNodeForNewInstance(roleStatus)
+ NodeInstance f = roleHistory.findRecentNodeForNewInstance(roleStatus)
if (f) {
found << f
};
@@ -81,17 +79,17 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest {
@Test
public void testFind1NodeR0() throws Throwable {
- NodeInstance found = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat)
log.info("found: $found")
assert [age3Active0].contains(found)
}
@Test
public void testFind2NodeR0() throws Throwable {
- NodeInstance found = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat)
log.info("found: $found")
assert [age2Active0, age3Active0].contains(found)
- NodeInstance found2 = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found2 = roleHistory.findRecentNodeForNewInstance(roleStat)
log.info("found: $found2")
assert [age2Active0, age3Active0].contains(found2)
assert found != found2;
@@ -100,7 +98,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest {
@Test
public void testFind3NodeR0ReturnsNull() throws Throwable {
assert 2== findNodes(2).size()
- NodeInstance found = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat)
assert found == null;
}
@@ -124,7 +122,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest {
assert age2Active0.getActiveRoleInstances(0) != 0
age3Active0.get(0).onStartCompleted()
assert age3Active0.getActiveRoleInstances(0) != 0
- NodeInstance found = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat)
log.info(found ?.toFullString())
assert found == null
}
@@ -148,7 +146,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest {
assert age2Active0.exceedsFailureThreshold(roleStat)
// get the role & expect age3 to be picked up, even though it is older
- NodeInstance found = roleHistory.findNodeForNewInstance(roleStat)
+ NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat)
assert age3Active0.is(found)
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 1c99c04..7b389cd 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -240,6 +240,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
def yarnRequest = req1.buildContainerRequest(resource, workerRole, 0)
assert (yarnRequest.nodeLabelExpression == null)
assert (!yarnRequest.relaxLocality)
+ // escalation
def yarnRequest2 = req1.escalate()
assert (yarnRequest2.nodeLabelExpression == WORKERS_LABEL)
assert (yarnRequest2.relaxLocality)
@@ -258,7 +259,6 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
resource.virtualCores = 1
resource.memory = 48;
- def label = null
// initial request
def yarnRequest = req1.buildContainerRequest(resource, role0Status, 0)
assert yarnRequest.nodes != null
@@ -269,6 +269,34 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
assert yarnRequest2.relaxLocality
}
+ @Test(expected = IllegalArgumentException)
+ public void testAARequestNoNodes() throws Throwable {
+ tracker.newAARequest(role0Status.key, [])
+ }
+
+ @Test
+ public void testAARequest() throws Throwable {
+ def role0 = role0Status.key
+ OutstandingRequest request = tracker.newAARequest(role0, [host1])
+ assert host1.hostname == request.hostname
+ assert !request.located
+ }
+
+ @Test
+ public void testAARequestPair() throws Throwable {
+ def role0 = role0Status.key
+ OutstandingRequest request = tracker.newAARequest(role0, [host1, host2])
+ assert host1.hostname == request.hostname
+ assert !request.located
+ def yarnRequest = request.buildContainerRequest(
+ role0Status.copyResourceRequirements(new MockResource(0, 0)),
+ role0Status,
+ 0)
+ assert !yarnRequest.relaxLocality
+ assert !request.mayEscalate()
+
+ assert yarnRequest.nodes.size() == 2
+ }
/**
* Create a new request (always against host1)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
index 14ac32a..2f160cb 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
@@ -87,7 +87,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
@Test
public void testRequestedNodeOffList() throws Throwable {
- NodeInstance ni = roleHistory.findNodeForNewInstance(roleStatus)
+ NodeInstance ni = roleHistory.findRecentNodeForNewInstance(roleStatus)
assert age3Active0 == ni
assertListEquals([age2Active0], roleHistory.cloneRecentNodeList(0))
roleHistory.requestInstanceOnNode(ni,
@@ -106,7 +106,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
recordAsFailed(age2Active0, 0, 4)
assert age2Active0.isConsideredUnreliable(0, roleStatus.nodeFailureThreshold)
// expect to get a null node back
- NodeInstance ni = roleHistory.findNodeForNewInstance(roleStatus)
+ NodeInstance ni = roleHistory.findRecentNodeForNewInstance(roleStatus)
assert !ni
// which is translated to a no-location request
@@ -123,7 +123,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
assert !age3Active0.isConsideredUnreliable(0, roleStatus.nodeFailureThreshold)
assert !roleHistory.cloneRecentNodeList(0).empty
// looking for a node should now find one
- ni = roleHistory.findNodeForNewInstance(roleStatus)
+ ni = roleHistory.findRecentNodeForNewInstance(roleStatus)
assert ni == age3Active0
req = roleHistory.requestInstanceOnNode(ni, roleStatus, resource).issuedRequest
assert 1 == req.nodes.size()
@@ -153,13 +153,13 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
assert recentRole0.indexOf(age3Active0) < recentRole0.indexOf(age2Active0)
// the non-strict role has no suitable nodes
- assert null == roleHistory.findNodeForNewInstance(role0Status)
+ assert null == roleHistory.findRecentNodeForNewInstance(role0Status)
- def ni = roleHistory.findNodeForNewInstance(targetRole)
+ def ni = roleHistory.findRecentNodeForNewInstance(targetRole)
assert ni
- def ni2 = roleHistory.findNodeForNewInstance(targetRole)
+ def ni2 = roleHistory.findRecentNodeForNewInstance(targetRole)
assert ni2
assert ni != ni2
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
index 14e556a..e1660ee 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
@@ -42,8 +42,10 @@ import org.apache.slider.server.appmaster.state.ContainerAssignment
import org.apache.slider.server.appmaster.state.ContainerOutcome
import org.apache.slider.server.appmaster.state.NodeEntry
import org.apache.slider.server.appmaster.state.NodeInstance
+import org.apache.slider.server.appmaster.state.ProviderAppState
import org.apache.slider.server.appmaster.state.RoleInstance
import org.apache.slider.server.appmaster.state.RoleStatus
+import org.apache.slider.server.appmaster.state.StateAccessForProviders
import org.apache.slider.test.SliderTestBase
import org.junit.Before
@@ -59,6 +61,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
protected Path historyPath;
protected MockApplicationId applicationId;
protected MockApplicationAttemptId applicationAttemptId;
+ protected StateAccessForProviders stateAccess
/**
* Override point: called in setup() to create the YARN engine; can
@@ -97,6 +100,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
fs.delete(historyPath, true)
appState = new MockAppState()
appState.buildInstance(buildBindingInfo())
+ stateAccess = new ProviderAppState(testName, appState)
}
/**