You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/25 21:28:36 UTC
[07/25] incubator-slider git commit: SLIDER-799 moving towards
escalation;
this code has a sort algorithm for the node target list that compares failure
history before age -about to change this
SLIDER-799 moving towards escalation; this code has a sort algorithm for the node target list that compares failure history before age -about to change this
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/559ed00a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/559ed00a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/559ed00a
Branch: refs/heads/develop
Commit: 559ed00a6b9f3aa8c2ec6c99eb3f85e9807f63ad
Parents: a0d76b7
Author: Steve Loughran <st...@apache.org>
Authored: Mon Mar 16 16:35:46 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Mar 16 16:35:46 2015 +0000
----------------------------------------------------------------------
.../slider/server/appmaster/state/AppState.java | 9 +--
.../MostRecentContainerReleaseSelector.java | 2 +-
.../server/appmaster/state/NodeInstance.java | 62 ++++++++++++---
.../slider/server/appmaster/state/NodeMap.java | 4 +-
.../appmaster/state/OutstandingRequest.java | 83 +++++++++++++++++---
.../state/OutstandingRequestTracker.java | 27 ++++---
.../appmaster/state/ProviderAppState.java | 8 +-
.../server/appmaster/state/RoleHistory.java | 53 ++++++-------
.../server/appmaster/state/RoleStatus.java | 4 +
.../slider/server/avro/NewerFilesFirst.java | 43 ++++++++++
.../slider/server/avro/OlderFilesFirst.java | 43 ++++++++++
.../slider/server/avro/RoleHistoryWriter.java | 62 +++------------
.../TestRoleHistoryContainerEvents.groovy | 4 +-
.../history/TestRoleHistoryNIComparators.groovy | 25 +++++-
...tRoleHistoryOutstandingRequestTracker.groovy | 14 ++--
.../history/TestRoleHistoryRWOrdering.groovy | 3 +-
16 files changed, 305 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 4713ef1..f2c237c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -745,7 +745,7 @@ public class AppState {
*/
private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException {
- List<ProviderRole> newRoles = new ArrayList<ProviderRole>(0);
+ List<ProviderRole> newRoles = new ArrayList<>(0);
//now update every role's desired count.
//if there are no instance values, that role count goes to zero
@@ -992,7 +992,7 @@ public class AppState {
public synchronized List<RoleInstance> cloneLiveContainerInfoList() {
List<RoleInstance> allRoleInstances;
Collection<RoleInstance> values = getLiveNodes().values();
- allRoleInstances = new ArrayList<RoleInstance>(values);
+ allRoleInstances = new ArrayList<>(values);
return allRoleInstances;
}
@@ -1220,7 +1220,6 @@ public class AppState {
AMRMClient.ContainerRequest request;
request = roleHistory.requestNode(role, resource, labelExpression);
incrementRequestCount(role);
-
return request;
}
@@ -1509,7 +1508,7 @@ public class AppState {
actual,
releasing,
completedCount);
- roleHistory.onReleaseCompleted(container, true);
+ roleHistory.onReleaseCompleted(container);
} else if (surplusNodes.remove(containerId)) {
//its a surplus one being purged
@@ -1873,7 +1872,7 @@ public class AppState {
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<AbstractRMOperation> reviewOneRole(RoleStatus role)
throws SliderInternalStateException, TriggerClusterTeardownException {
- List<AbstractRMOperation> operations = new ArrayList<AbstractRMOperation>();
+ List<AbstractRMOperation> operations = new ArrayList<>();
int delta;
String details;
int expected;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
index 841dda3..9d936a1 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java
@@ -40,7 +40,7 @@ public class MostRecentContainerReleaseSelector implements ContainerReleaseSelec
private static class newerThan implements Comparator<RoleInstance>, Serializable {
private final Comparator<Long> innerComparator =
- new Comparators.ComparatorReverser<Long>(new Comparators.LongComparator());
+ new Comparators.ComparatorReverser<>(new Comparators.LongComparator());
public int compare(RoleInstance o1, RoleInstance o2) {
return innerComparator.compare(o1.createTime, o2.createTime);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
index 231865e..71b74fc 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java
@@ -96,6 +96,22 @@ public class NodeInstance {
}
/**
+ * Query for a node being considered unreliable
+ * @param role role key
+ * @param threshold threshold above which a node is considered unreliable
+ * @return true if the node is considered unreliable
+ */
+ public boolean isConsideredUnreliable(int role, int threshold) {
+ NodeEntry entry = get(role);
+
+ if (entry != null) {
+ return entry.getFailedRecently() > threshold;
+ } else {
+ return false;
+ }
+ }
+
+ /**
* Get the entry for a role -and remove it if present
* @param role the role index
* @return the entry that WAS there
@@ -113,6 +129,7 @@ public class NodeInstance {
nodeEntries.add(nodeEntry);
}
+
/**
* run through each entry; gc'ing & removing old ones
* @param absoluteTime age in millis
@@ -185,28 +202,49 @@ public class NodeInstance {
}
/**
- * A comparator for sorting entries where the role is newer than
- * the other.
- * This sort only compares the lastUsed field, not whether the
- * node is in use or not
+ * A comparator for sorting entries where the node is preferred over another.
+ * If there's no entry for an element then it's failure count is set to -1, age to 0
+ * for the purposes of the comparison
+ * <ol>
+ * <li>Entry exists => end of list as unknown</li>
+ * <li>Lowest failure count</li>
+ * <li>Age</li>
+ * </ol>
+ *
+ * @return +ve int if left is preferred to right; -ve if right over left, 0 for equal
*/
- public static class newerThan implements Comparator<NodeInstance>,
+ public static class Preferred implements Comparator<NodeInstance>,
Serializable {
final int role;
- public newerThan(int role) {
+ public Preferred(int role) {
this.role = role;
}
@Override
public int compare(NodeInstance o1, NodeInstance o2) {
- long age = o1.getOrCreate(role).getLastUsed();
- long age2 = o2.getOrCreate(role).getLastUsed();
+ NodeEntry left = o1.get(role);
+ NodeEntry right = o2.get(role);
+
+ // sort by failure count first
+ int failL = left != null ? left.getFailedRecently() : -1;
+ int failR = right != null ? right.getFailedRecently() : -1;
+
+ if (failL < failR) {
+ return 1;
+ }
+ if (failR > failL) {
+ return -1;
+ }
+
+ // failure counts are equal: compare age
+ long ageL = left != null ? left.getLastUsed() : 0;
+ long ageR = right != null ? right.getLastUsed() : 0;
- if (age > age2) {
+ if (ageL > ageR) {
return -1;
- } else if (age < age2) {
+ } else if (ageL < ageR) {
return 1;
}
// equal
@@ -220,12 +258,12 @@ public class NodeInstance {
* This sort only compares the lastUsed field, not whether the
* node is in use or not
*/
- public static class moreActiveThan implements Comparator<NodeInstance>,
+ public static class MoreActiveThan implements Comparator<NodeInstance>,
Serializable {
final int role;
- public moreActiveThan(int role) {
+ public MoreActiveThan(int role) {
this.role = role;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
index fe40086..aa50baa 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java
@@ -73,13 +73,13 @@ public class NodeMap extends HashMap<String, NodeInstance> {
* in that role
*/
public List<NodeInstance> listActiveNodes(int role) {
- List<NodeInstance> nodes = new ArrayList<NodeInstance>();
+ List<NodeInstance> nodes = new ArrayList<>();
for (NodeInstance instance : values()) {
if (instance.getActiveRoleInstances(role) > 0) {
nodes.add(instance);
}
}
- Collections.sort(nodes, new NodeInstance.moreActiveThan(role));
+ Collections.sort(nodes, new NodeInstance.MoreActiveThan(role));
return nodes;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index c6a9cff..f6b83a7 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -19,12 +19,17 @@
package org.apache.slider.server.appmaster.state;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
/**
* Tracks an outstanding request. This is used to correlate an allocation response
* with the node and role used in the request.
@@ -50,18 +55,35 @@ public final class OutstandingRequest {
* Node the request is for -may be null
*/
public final NodeInstance node;
+
/**
* hostname -will be null if node==null
*/
public final String hostname;
/**
- * requested time -only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)}
+ * Requested time in millis.
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)}
+ */
+ public AMRMClient.ContainerRequest issuedRequest;
+
+ /**
+ * Requested time in millis.
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)}
+ */
+ public long requestedTimeMillis;
+
+ /**
+ * Time in millis after which escalation should be triggered..
+ * <p>
+ * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)}
*/
- public long requestedTime;
+ public long escalationTimeoutMillis;
/**
- * Has the placement request been escalated by cancel and re-request
+ * Has the placement request been escalated?
*/
public boolean escalated;
@@ -91,9 +113,14 @@ public final class OutstandingRequest {
this.hostname = hostname;
}
+ /**
+ * Is the request located in the cluster, that is: does it have a node.
+ * @return
+ */
public boolean isLocated() {
return node != null;
}
+
/**
* Build a container request.
* If the request has an address, it is set in the container request
@@ -103,7 +130,7 @@ public final class OutstandingRequest {
* on outstanding requests
* @param resource resource
* @param role role
- * @param time time to record as request time
+ * @param time time in millis to record as request time
* @param labelExpression label to satisfy
* @return the request to raise
*/
@@ -111,7 +138,8 @@ public final class OutstandingRequest {
Resource resource, RoleStatus role, long time, String labelExpression) {
String[] hosts;
boolean relaxLocality;
- requestedTime = time;
+ requestedTimeMillis = time;
+ escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000;
boolean usePlacementHistory = role.isStrictPlacement();
if (!usePlacementHistory) {
// If strict placement does not mandate using placement then check
@@ -133,23 +161,56 @@ public final class OutstandingRequest {
// tell the node it is in play
node.getOrCreate(roleId);
log.info("Submitting request for container on {}", hosts[0]);
+ escalated = false;
} else {
+ // the placement is implicitly escalated.
+ escalated = true;
hosts = null;
relaxLocality = true;
}
- Priority pri = ContainerPriority.createPriority(roleId,
- !relaxLocality);
- AMRMClient.ContainerRequest request =
- new AMRMClient.ContainerRequest(resource,
+ Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality);
+ issuedRequest = new AMRMClient.ContainerRequest(resource,
hosts,
null,
pri,
relaxLocality,
labelExpression);
- return request;
+ return issuedRequest;
}
+
+ /**
+ * Build an escalated container request, updating {@link #issuedRequest} with
+ * the new value.
+ * @return the new container request, which has the same resource and label requirements
+ * as the original one, and the same host, but: relaxed placement, and a changed priority
+ * so as to place it into the relaxed list.
+ */
+ public AMRMClient.ContainerRequest buildEscalatedContainerRequest() {
+ escalated = true;
+ Preconditions.checkNotNull(issuedRequest, "issued request");
+ Priority pri = ContainerPriority.createPriority(roleId, true);
+ String[] nodes;
+ List<String> issuedRequestNodes = issuedRequest.getNodes();
+ if (issuedRequestNodes != null) {
+ nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]);
+ } else {
+ nodes = null;
+ }
+
+
+ AMRMClient.ContainerRequest newRequest =
+ new AMRMClient.ContainerRequest(issuedRequest.getCapability(),
+ nodes,
+ null,
+ pri,
+ true,
+ issuedRequest.getNodeLabelExpression());
+ issuedRequest = newRequest;
+ return issuedRequest;
+ }
+
/**
* Mark the request as completed (or canceled).
*/
@@ -202,7 +263,7 @@ public final class OutstandingRequest {
new StringBuilder("OutstandingRequest{");
sb.append("roleId=").append(roleId);
sb.append(", node='").append(node).append('\'');
- sb.append(", requestedTime=").append(requestedTime);
+ sb.append(", requestedTime=").append(requestedTimeMillis);
sb.append('}');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index f6dc2de..e197a86 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -34,6 +34,11 @@ import java.util.Map;
/**
* Tracks outstanding requests made with a specific placement option.
* <p>
+ * <ol>
+ * <li>Used to decide when to return a node to 'can request containers here' list</li>
+ * <li>Used to identify requests where placement has timed out, and so issue relaxed requests</li>
+ * </ol>
+ * <p>
* If an allocation comes in that is not in the map: either the allocation
* was unplaced, or the placed allocation could not be met on the specified
* host, and the RM/scheduler fell back to another location.
@@ -56,7 +61,7 @@ public class OutstandingRequestTracker {
* @param role role index
* @return a new request
*/
- public synchronized OutstandingRequest addRequest(NodeInstance instance, int role) {
+ public synchronized OutstandingRequest newRequest(NodeInstance instance, int role) {
OutstandingRequest request =
new OutstandingRequest(role, instance);
if (request.isLocated()) {
@@ -69,7 +74,7 @@ public class OutstandingRequestTracker {
* Look up any oustanding request to a (role, hostname).
* @param role role index
* @param hostname hostname
- * @return the request or null if there was no outstanding one
+ * @return the request or null if there was no outstanding one in the {@link #placedRequests}
*/
public synchronized OutstandingRequest lookup(int role, String hostname) {
return placedRequests.get(new OutstandingRequest(role, hostname));
@@ -78,7 +83,7 @@ public class OutstandingRequestTracker {
/**
* Remove a request
* @param request matching request to find
- * @return the request
+ * @return the request or null for no match in the {@link #placedRequests}
*/
public synchronized OutstandingRequest remove(OutstandingRequest request) {
return placedRequests.remove(request);
@@ -86,10 +91,10 @@ public class OutstandingRequestTracker {
/**
* Notification that a container has been allocated -drop it
- * from the list of outstanding roles if need be
+ * from the {@link #placedRequests} structure.
* @param role role index
* @param hostname hostname
- * @return true if an entry was found and dropped
+ * @return true if an entry was found and removed
*/
public synchronized boolean onContainerAllocated(int role, String hostname) {
OutstandingRequest request =
@@ -167,21 +172,21 @@ public class OutstandingRequestTracker {
* in container assignments if more come back than expected
* @param rh RoleHistory instance
* @param inAllocated the list of allocated containers
- * @param outRequested initially empty list of requested locations
- * @param outUnrequested initially empty list of unrequested hosts
+ * @param outPlaceRequested initially empty list of requested locations
+ * @param outUnplaced initially empty list of unrequested hosts
*/
public synchronized void partitionRequests(RoleHistory rh,
List<Container> inAllocated,
- List<Container> outRequested,
- List<Container> outUnrequested) {
+ List<Container> outPlaceRequested,
+ List<Container> outUnplaced) {
Collections.sort(inAllocated, new newerThan(rh));
for (Container container : inAllocated) {
int role = ContainerPriority.extractRole(container);
String hostname = RoleHistoryUtils.hostnameOf(container);
if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) {
- outRequested.add(container);
+ outPlaceRequested.add(container);
} else {
- outUnrequested.add(container);
+ outUnplaced.add(container);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
index 632551e..0508579 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java
@@ -47,7 +47,7 @@ public class ProviderAppState implements StateAccessForProviders {
private final Map<String, PublishedConfigSet> publishedConfigSets =
- new ConcurrentHashMap<String, PublishedConfigSet>(5);
+ new ConcurrentHashMap<>(5);
private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet();
private static final PatternValidator validator = new PatternValidator(
RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP);
@@ -106,7 +106,7 @@ public class ProviderAppState implements StateAccessForProviders {
public List<String> listConfigSets() {
synchronized (publishedConfigSets) {
- List<String> sets = new ArrayList<String>(publishedConfigSets.keySet());
+ List<String> sets = new ArrayList<>(publishedConfigSets.keySet());
return sets;
}
}
@@ -266,7 +266,7 @@ public class ProviderAppState implements StateAccessForProviders {
RoleStatus roleStatus = lookupRoleStatus(component);
List<RoleInstance> ownedContainerList = cloneOwnedContainerList();
List<RoleInstance> matching =
- new ArrayList<RoleInstance>(ownedContainerList.size());
+ new ArrayList<>(ownedContainerList.size());
int roleId = roleStatus.getPriority();
for (RoleInstance instance : ownedContainerList) {
if (instance.roleId == roleId) {
@@ -281,7 +281,7 @@ public class ProviderAppState implements StateAccessForProviders {
RoleStatus roleStatus = lookupRoleStatus(component);
ComponentInformation info = roleStatus.serialize();
List<RoleInstance> containers = lookupRoleContainers(component);
- info.containers = new ArrayList<String>(containers.size());
+ info.containers = new ArrayList<>(containers.size());
for (RoleInstance container : containers) {
info.containers.add(container.id);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index eef2b8f..acfe606 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -94,10 +94,8 @@ public class RoleHistory {
* Track the failed nodes. Currently used to make wiser decision of container
* ask with/without locality. Has other potential uses as well.
*/
- private Set<String> failedNodes = new HashSet<String>();
-
- // dummy to be used in maps for faster lookup where we don't care about values
- private final Object DUMMY_VALUE = new Object();
+ private Set<String> failedNodes = new HashSet<>();
+
public RoleHistory(List<ProviderRole> providerRoles) throws
BadConfigException {
@@ -117,7 +115,7 @@ public class RoleHistory {
outstandingRequests = new OutstandingRequestTracker();
- Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>();
+ Map<Integer, RoleStatus> roleStats = new HashMap<>();
for (ProviderRole providerRole : providerRoles) {
checkProviderRole(roleStats, providerRole);
}
@@ -156,7 +154,7 @@ public class RoleHistory {
throws BadConfigException {
log.debug("Validating/adding new provider role to role history: {} ",
providerRole);
- Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>();
+ Map<Integer, RoleStatus> roleStats = new HashMap<>();
for (ProviderRole role : providerRoles) {
roleStats.put(role.id, new RoleStatus(role));
@@ -171,7 +169,7 @@ public class RoleHistory {
* Clear the lists of available nodes
*/
private synchronized void resetAvailableNodeLists() {
- availableNodes = new HashMap<Integer, LinkedList<NodeInstance>>(roleSize);
+ availableNodes = new HashMap<>(roleSize);
}
/**
@@ -279,8 +277,10 @@ public class RoleHistory {
}
/**
- * Garbage collect the structure -this will dropp
- * all nodes that have been inactive since the (relative) age
+ * Garbage collect the structure -this will drop
+ * all nodes that have been inactive since the (relative) age.
+ * This will drop the failure counts of the nodes too, so it will
+ * lose information that matters.
* @param age relative age
*/
public void gc(long age) {
@@ -423,8 +423,7 @@ public class RoleHistory {
public synchronized void buildAvailableNodeLists() {
resetAvailableNodeLists();
// build the list of available nodes
- for (Map.Entry<String, NodeInstance> entry : nodemap
- .entrySet()) {
+ for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) {
NodeInstance ni = entry.getValue();
for (int i = 0; i < roleSize; i++) {
NodeEntry nodeEntry = ni.get(i);
@@ -458,13 +457,12 @@ public class RoleHistory {
private LinkedList<NodeInstance> getOrCreateNodesForRoleId(int id) {
LinkedList<NodeInstance> instances = availableNodes.get(id);
if (instances == null) {
- instances = new LinkedList<NodeInstance>();
+ instances = new LinkedList<>();
availableNodes.put(id, instances);
}
return instances;
}
-
/**
* Sort an available node list
* @param role role to sort
@@ -472,7 +470,7 @@ public class RoleHistory {
private void sortAvailableNodeList(int role) {
List<NodeInstance> nodesForRoleId = getNodesForRoleId(role);
if (nodesForRoleId != null) {
- Collections.sort(nodesForRoleId, new NodeInstance.newerThan(role));
+ Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role));
}
}
@@ -492,7 +490,7 @@ public class RoleHistory {
List<NodeInstance> targets = getNodesForRoleId(roleKey);
if (targets == null) {
// add an empty list here for ease downstream
- targets = new ArrayList<NodeInstance>(0);
+ targets = new ArrayList<>(0);
}
int cnt = targets.size();
log.debug("There are {} node(s) to consider for {}", cnt, role.getName());
@@ -527,7 +525,7 @@ public class RoleHistory {
*/
public synchronized AMRMClient.ContainerRequest requestInstanceOnNode(
NodeInstance node, RoleStatus role, Resource resource, String labelExpression) {
- OutstandingRequest outstanding = outstandingRequests.addRequest(node, role.getKey());
+ OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey());
return outstanding.buildContainerRequest(resource, role, now(), labelExpression);
}
@@ -622,9 +620,9 @@ public class RoleHistory {
//partition into requested and unrequested
List<Container> requested =
- new ArrayList<Container>(allocatedContainers.size());
+ new ArrayList<>(allocatedContainers.size());
List<Container> unrequested =
- new ArrayList<Container>(allocatedContainers.size());
+ new ArrayList<>(allocatedContainers.size());
outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
//give the unrequested ones lower priority
@@ -647,12 +645,13 @@ public class RoleHistory {
boolean requestFound =
outstandingRequests.onContainerAllocated(role, hostname);
if (desiredCount <= actualCount) {
- //cancel the nodes
+ // all oustanding requests have been satisfied
+ // tag nodes as available
List<NodeInstance>
hosts = outstandingRequests.cancelOutstandingRequests(role);
if (!hosts.isEmpty()) {
//add the list
- log.debug("Adding {} hosts for role {}", hosts.size(), role);
+ log.info("Adding {} hosts for role {}", hosts.size(), role);
nodeInstances.addAll(hosts);
sortAvailableNodeList(role);
}
@@ -708,8 +707,9 @@ public class RoleHistory {
*/
public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) {
for (NodeReport updatedNode : updatedNodes) {
- String hostname = updatedNode.getNodeId() == null ? null : updatedNode
- .getNodeId().getHost();
+ String hostname = updatedNode.getNodeId() == null
+ ? null
+ : updatedNode.getNodeId().getHost();
if (hostname == null) {
continue;
}
@@ -735,11 +735,10 @@ public class RoleHistory {
/**
* App state notified of a container completed
* @param container completed container
- * @param wasReleased
* @return true if the node was queued
*/
- public boolean onReleaseCompleted(Container container, boolean wasReleased) {
- return markContainerFinished(container, wasReleased, false);
+ public boolean onReleaseCompleted(Container container) {
+ return markContainerFinished(container, true, false);
}
/**
@@ -832,7 +831,7 @@ public class RoleHistory {
*/
@VisibleForTesting
public List<NodeInstance> cloneAvailableList(int role) {
- return new LinkedList<NodeInstance>(getOrCreateNodesForRoleId(role));
+ return new LinkedList<>(getOrCreateNodesForRoleId(role));
}
/**
@@ -850,7 +849,7 @@ public class RoleHistory {
* @return the list
*/
public List<String> cloneFailedNodes() {
- List<String> lst = new ArrayList<String>();
+ List<String> lst = new ArrayList<>();
lst.addAll(failedNodes);
return lst;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
index ac1b131..7ab7dbe 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -74,6 +74,10 @@ public final class RoleStatus implements Cloneable {
return providerRole.placementPolicy;
}
+ public long getPlacementTimeoutSeconds() {
+ return providerRole.placementTimeoutSeconds;
+ }
+
/**
* The number of failures on a specific node that can be tolerated
* before selecting a different node for placement
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java b/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java
new file mode 100644
index 0000000..2e049cb
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.avro;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Compare two filenames by name; the more recent one comes first
+ */
+public class NewerFilesFirst implements Comparator<Path>, Serializable {
+
+ /**
+ * Takes the ordering of path names from the normal string comparison
+ * and negates it, so that names that come after other names in
+ * the string sort come before here
+ * @param o1 leftmost
+ * @param o2 rightmost
+ * @return positive if o1 > o2
+ */
+ @Override
+ public int compare(Path o1, Path o2) {
+ return (o2.getName().compareTo(o1.getName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java b/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java
new file mode 100644
index 0000000..407aaa6
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.avro;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Compare two filenames by name; the older ones comes first
+ */
+public class OlderFilesFirst implements Comparator<Path>, Serializable {
+
+ /**
+ * Takes the ordering of path names from the normal string comparison
+ * and negates it, so that names that come after other names in
+ * the string sort come before here
+ * @param o1 leftmost
+ * @param o2 rightmost
+ * @return positive if o1 > o2
+ */
+ @Override
+ public int compare(Path o1, Path o2) {
+ return (o1.getName().compareTo(o2.getName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java
index 422ffeb..031632b 100644
--- a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java
+++ b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java
@@ -52,11 +52,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
@@ -90,7 +88,7 @@ public class RoleHistoryWriter {
throws IOException {
try {
DatumWriter<RoleHistoryRecord> writer =
- new SpecificDatumWriter<RoleHistoryRecord>(RoleHistoryRecord.class);
+ new SpecificDatumWriter<>(RoleHistoryRecord.class);
int roles = history.getRoleSize();
RoleHistoryHeader header = new RoleHistoryHeader();
@@ -184,7 +182,7 @@ public class RoleHistoryWriter {
BadConfigException {
try {
DatumReader<RoleHistoryRecord> reader =
- new SpecificDatumReader<RoleHistoryRecord>(RoleHistoryRecord.class);
+ new SpecificDatumReader<>(RoleHistoryRecord.class);
Decoder decoder =
DecoderFactory.get().jsonDecoder(RoleHistoryRecord.getClassSchema(),
in);
@@ -351,16 +349,15 @@ public class RoleHistoryWriter {
public static void sortHistoryPaths(List<Path> paths) {
Collections.sort(paths, new NewerFilesFirst());
}
-
-
+
/**
* Iterate through the paths until one can be loaded
* @param roleHistory role history
* @param paths paths to load
* @return the path of any loaded history -or null if all failed to load
*/
- public Path attemptToReadHistory(RoleHistory roleHistory, FileSystem fileSystem, List<Path> paths) throws
- BadConfigException {
+ public Path attemptToReadHistory(RoleHistory roleHistory, FileSystem fileSystem, List<Path> paths)
+ throws BadConfigException {
ListIterator<Path> pathIterator = paths.listIterator();
boolean success = false;
Path path = null;
@@ -389,9 +386,8 @@ public class RoleHistoryWriter {
* @throws IOException if indexing the history directory fails.
*/
public Path loadFromHistoryDir(FileSystem fs, Path dir,
- RoleHistory roleHistory) throws
- IOException,
- BadConfigException {
+ RoleHistory roleHistory)
+ throws IOException, BadConfigException {
assert fs != null: "null filesystem";
List<Path> entries = findAllHistoryEntries(fs, dir, false);
return attemptToReadHistory(roleHistory, fs, entries);
@@ -407,9 +403,8 @@ public class RoleHistoryWriter {
* check to stop the entire dir being purged)
* @throws IOException IO problems
*/
- public int purgeOlderHistoryEntries(FileSystem fileSystem, Path keep) throws
- IOException {
- assert fileSystem != null : "null filesystem";
+ public int purgeOlderHistoryEntries(FileSystem fileSystem, Path keep)
+ throws IOException { assert fileSystem != null : "null filesystem";
if (!fileSystem.exists(keep)) {
throw new FileNotFoundException(keep.toString());
}
@@ -429,44 +424,5 @@ public class RoleHistoryWriter {
}
return deleteCount;
}
-
- /**
- * Compare two filenames by name; the more recent one comes first
- */
- public static class NewerFilesFirst implements Comparator<Path> ,
- Serializable {
- /**
- * Takes the ordering of path names from the normal string comparison
- * and negates it, so that names that come after other names in
- * the string sort come before here
- * @param o1 leftmost
- * @param o2 rightmost
- * @return positive if o1 > o2
- */
- @Override
- public int compare(Path o1, Path o2) {
- return (o2.getName().compareTo(o1.getName()));
- }
- }
- /**
- * Compare two filenames by name; the older ones comes first
- */
- public static class OlderFilesFirst implements Comparator<Path> ,
- Serializable {
-
- /**
- * Takes the ordering of path names from the normal string comparison
- * and negates it, so that names that come after other names in
- * the string sort come before here
- * @param o1 leftmost
- * @param o2 rightmost
- * @return positive if o1 > o2
- */
- @Override
- public int compare(Path o1, Path o2) {
- return (o1.getName().compareTo(o2.getName()));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
index dbb70fa..c7a38f5 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
@@ -18,8 +18,6 @@
package org.apache.slider.server.appmaster.model.history
-import java.util.List;
-
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.Container
@@ -164,7 +162,7 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest {
assert roleEntry.active == 0
// release completed
- roleHistory.onReleaseCompleted(container, true)
+ roleHistory.onReleaseCompleted(container)
assert roleEntry.releasing == 0
assert roleEntry.live == 0
assert roleEntry.active == 0
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy
index 612cce8..74dfd42 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy
@@ -21,6 +21,7 @@ package org.apache.slider.server.appmaster.model.history
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.state.NodeInstance
+import org.junit.Before
import org.junit.Test
/**
@@ -33,9 +34,17 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest {
NodeInstance age3Active0 = nodeInstance(1002, 0, 0, 0)
NodeInstance age4Active1 = nodeInstance(1005, 0, 0, 0)
NodeInstance empty = new NodeInstance("empty", MockFactory.ROLE_COUNT)
+ NodeInstance age6failing = nodeInstance(1006, 0, 0, 0)
+ NodeInstance age1failing = nodeInstance(1000, 0, 0, 0)
List<NodeInstance> nodes = [age2Active2, age4Active1, age1Active4, age3Active0]
+ @Before
+ public void setup() {
+ age6failing.get(0).failedRecently = 2;
+ age1failing.get(0).failedRecently = 1;
+ }
+
@Override
String getTestName() {
return "TestNIComparators"
@@ -44,16 +53,24 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest {
@Test
public void testNewerThan() throws Throwable {
- Collections.sort(nodes, new NodeInstance.newerThan(0))
+ Collections.sort(nodes, new NodeInstance.Preferred(0))
assertListEquals(nodes,
[age4Active1, age3Active0, age2Active2, age1Active4])
}
@Test
+ public void testFailureCountFirst() throws Throwable {
+ def preferred = new NodeInstance.Preferred(0)
+ assert preferred.compare(age6failing, age1failing) == -1
+ assert preferred.compare(age1failing, age6failing) == 1
+ assert preferred.compare(age1failing, age1failing) == 0
+ }
+
+ @Test
public void testNewerThanNoRole() throws Throwable {
nodes << empty
- Collections.sort(nodes, new NodeInstance.newerThan(0))
+ Collections.sort(nodes, new NodeInstance.Preferred(0))
assertListEquals(nodes,
[age4Active1, age3Active0, age2Active2, age1Active4, empty])
}
@@ -61,7 +78,7 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest {
@Test
public void testMoreActiveThan() throws Throwable {
- Collections.sort(nodes, new NodeInstance.moreActiveThan(0))
+ Collections.sort(nodes, new NodeInstance.MoreActiveThan(0))
assertListEquals(nodes,
[age1Active4, age2Active2, age4Active1, age3Active0],)
}
@@ -69,7 +86,7 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest {
@Test
public void testMoreActiveThanEmpty() throws Throwable {
nodes << empty
- Collections.sort(nodes, new NodeInstance.moreActiveThan(0))
+ Collections.sort(nodes, new NodeInstance.MoreActiveThan(0))
assertListEquals(nodes,
[age1Active4, age2Active2, age4Active1, age3Active0, empty])
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 7085678..79a9bd6 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -38,7 +38,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
@Test
public void testAddRetrieveEntry() throws Throwable {
- OutstandingRequest request = tracker.addRequest(host1, 0)
+ OutstandingRequest request = tracker.newRequest(host1, 0)
assert tracker.lookup(0, "host1").equals(request)
assert tracker.remove(request).equals(request)
assert !tracker.lookup(0, "host1")
@@ -46,9 +46,9 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
@Test
public void testAddCompleteEntry() throws Throwable {
- tracker.addRequest(host1, 0)
- tracker.addRequest(host2, 0)
- tracker.addRequest(host1, 1)
+ tracker.newRequest(host1, 0)
+ tracker.newRequest(host2, 0)
+ tracker.newRequest(host1, 1)
assert tracker.onContainerAllocated(1, "host1")
assert !tracker.lookup(1, "host1")
assert tracker.lookup(0, "host1")
@@ -56,9 +56,9 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
@Test
public void testCancelEntries() throws Throwable {
- OutstandingRequest r1 = tracker.addRequest(host1, 0)
- OutstandingRequest r2 = tracker.addRequest(host2, 0)
- OutstandingRequest r3 = tracker.addRequest(host1, 1)
+ OutstandingRequest r1 = tracker.newRequest(host1, 0)
+ OutstandingRequest r2 = tracker.newRequest(host2, 0)
+ OutstandingRequest r3 = tracker.newRequest(host1, 1)
List<NodeInstance> canceled = tracker.cancelOutstandingRequests(0)
assert canceled.size() == 2
assert canceled.contains(host1)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy
index a0663e8..aef22fb 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy
@@ -26,6 +26,7 @@ import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.state.NodeEntry
import org.apache.slider.server.appmaster.state.NodeInstance
import org.apache.slider.server.appmaster.state.RoleHistory
+import org.apache.slider.server.avro.NewerFilesFirst
import org.apache.slider.server.avro.RoleHistoryWriter
import org.junit.Test
@@ -119,7 +120,7 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest {
@Test
public void testPathnameComparator() throws Throwable {
- def newerName = new RoleHistoryWriter.NewerFilesFirst()
+ def newerName = new NewerFilesFirst()
log.info("$h_5fffa name is ${h_5fffa.getName()}")
log.info("$h_0406c name is ${h_0406c.getName()}")