You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/10/18 23:35:58 UTC
git commit: SLIDER-490: cancel outstanding requests on flex down,
as needed
Repository: incubator-slider
Updated Branches:
refs/heads/develop c9b47f11c -> a2c266126
SLIDER-490: cancel outstanding requests on flex down, as needed
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/a2c26612
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/a2c26612
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/a2c26612
Branch: refs/heads/develop
Commit: a2c26612660c8b3a4571e91ecb195253a7e169fa
Parents: c9b47f1
Author: Steve Loughran <st...@apache.org>
Authored: Sat Oct 18 22:29:42 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Sat Oct 18 22:35:43 2014 +0100
----------------------------------------------------------------------
.../providers/AbstractProviderService.java | 10 +-
.../server/appmaster/SliderAppMaster.java | 8 +-
.../operations/AsyncRMOperationHandler.java | 53 ++++-
.../operations/CancelRequestOperation.java | 58 ++++++
.../operations/ContainerRequestOperation.java | 3 +-
.../ProviderNotifyingOperationHandler.java | 9 +-
.../operations/RMOperationHandler.java | 4 +-
.../operations/RMOperationHandlerActions.java | 9 +
.../slider/server/appmaster/state/AppState.java | 117 +++++++-----
.../appmaster/state/ContainerPriority.java | 37 +++-
.../server/appmaster/state/RoleStatus.java | 83 ++++----
.../TestMockAppStateRMOperations.groovy | 191 +++++++++++++++++--
.../model/mock/MockProviderService.groovy | 9 +
.../model/mock/MockRMOperationHandler.groovy | 22 ++-
.../apache/slider/test/SliderTestUtils.groovy | 37 +++-
15 files changed, 528 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index 36ee910..c628d8a 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
@@ -332,7 +333,7 @@ public abstract class AbstractProviderService
public Map<String, String> buildMonitorDetails(ClusterDescription clusterDesc) {
Map<String, String> details = new LinkedHashMap<String, String>();
- // add in all the
+ // add in all the endpoints
buildEndpointDetails(details);
return details;
@@ -398,6 +399,13 @@ public abstract class AbstractProviderService
// no-op
}
+ @Override
+ public int cancelContainerRequests(Priority priority1,
+ Priority priority2,
+ int count) {
+ return 0;
+ }
+
/**
* No-op implementation of this method.
*/
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 06d3597..d583a90 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -629,8 +629,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval,
this);
addService(asyncRMClient);
- //wrap it for the app state model
- rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient);
//now bring it up
deployChildService(asyncRMClient);
@@ -707,6 +705,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
containerMaxCores = maxResources.getVirtualCores();
appState.setContainerLimits(maxResources.getMemory(),
maxResources.getVirtualCores());
+
+ // build the handler for RM request/release operations; this uses
+ // the max value as part of its lookup
+ rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient,
+ maxResources);
+
// set the RM-defined maximum cluster values
appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(containerMaxCores));
appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(containerMaxMemory));
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
index f7a95a7..1cbb960 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
@@ -19,21 +19,70 @@
package org.apache.slider.server.appmaster.operations;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.List;
+
/**
- * Hands off RM operations to the Resource Manager
+ * Hands off RM operations to the Resource Manager.
*/
public class AsyncRMOperationHandler extends RMOperationHandler {
protected static final Logger log =
LoggerFactory.getLogger(AsyncRMOperationHandler.class);
private final AMRMClientAsync client;
+ private final Resource maxResources;
- public AsyncRMOperationHandler(AMRMClientAsync client) {
+ public AsyncRMOperationHandler(AMRMClientAsync client, Resource maxResources) {
this.client = client;
+ this.maxResources = maxResources;
+ }
+
+ @Override
+ public int cancelContainerRequests(Priority priority1,
+ Priority priority2,
+ int count) {
+ // need to revoke a previously issued container request
+ // so enum the sets and pick some
+ int remaining = cancelSinglePriorityRequests(priority1, count);
+ remaining = cancelSinglePriorityRequests(priority2, remaining);
+
+ return remaining;
+ }
+
+ /**
+ * Cancel just one of the priority levels
+ * @param priority priority to cancel
+ * @param count count to cancel
+ * @return number of requests cancelled
+ */
+ protected int cancelSinglePriorityRequests(Priority priority,
+ int count) {
+ List<Collection<AMRMClient.ContainerRequest>> requestSets =
+ client.getMatchingRequests(priority, "", maxResources);
+ if (count <= 0) {
+ return 0;
+ }
+ int remaining = count;
+ for (Collection<AMRMClient.ContainerRequest> requestSet : requestSets) {
+ if (remaining == 0) {
+ break;
+ }
+ for (AMRMClient.ContainerRequest request : requestSet) {
+ if (remaining == 0) {
+ break;
+ }
+ // a single release
+ client.removeContainerRequest(request);
+ remaining --;
+ }
+ }
+ return remaining;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
new file mode 100644
index 0000000..be5dbab
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.operations;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.slider.server.appmaster.state.ContainerPriority;
+
+/**
+ * Cancel a container request
+ */
+public class CancelRequestOperation extends AbstractRMOperation {
+
+ private final Priority priority1;
+ private final Priority priority2;
+ private final int count;
+
+ public CancelRequestOperation(Priority priority1, Priority priority2, int count) {
+ this.priority1 = priority1;
+ this.priority2 = priority2;
+ this.count = count;
+ }
+
+ @Override
+ public void execute(RMOperationHandler handler) {
+ handler.cancelContainerRequests(priority1, priority2, count);
+ }
+
+ @Override
+ public String toString() {
+ return "release " + count
+ + " requests for " + ContainerPriority.toString(priority1)
+ + " and " + ContainerPriority.toString(priority2);
+ }
+
+ /**
+ * Get the number to release
+ * @return the number of containers to release
+ */
+ public int getCount() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
index 711bb98..203f898 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
@@ -19,6 +19,7 @@
package org.apache.slider.server.appmaster.operations;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.slider.server.appmaster.state.ContainerPriority;
public class ContainerRequestOperation extends AbstractRMOperation {
@@ -39,6 +40,6 @@ public class ContainerRequestOperation extends AbstractRMOperation {
@Override
public String toString() {
- return "request container ";
+ return "request container for " + ContainerPriority.toString(request.getPriority());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
index a24d9e5..66df566 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
@@ -19,12 +19,13 @@
package org.apache.slider.server.appmaster.operations;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.slider.providers.ProviderService;
public class ProviderNotifyingOperationHandler extends RMOperationHandler {
- final ProviderService providerService;
+ private final ProviderService providerService;
public ProviderNotifyingOperationHandler(ProviderService providerService) {
this.providerService = providerService;
@@ -38,6 +39,12 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler {
@Override
public void addContainerRequest(AMRMClient.ContainerRequest req) {
providerService.addContainerRequest(req);
+ }
+ @Override
+ public int cancelContainerRequests(Priority priority1,
+ Priority priority2,
+ int count) {
+ return providerService.cancelContainerRequests(priority1, priority2, count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java
index 2b6e9e2..3ab9d89 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandler.java
@@ -18,11 +18,12 @@
package org.apache.slider.server.appmaster.operations;
+import org.apache.hadoop.yarn.api.records.Priority;
+
import java.util.List;
public abstract class RMOperationHandler implements RMOperationHandlerActions {
-
/**
* Execute an entire list of operations
* @param operations ops
@@ -32,4 +33,5 @@ public abstract class RMOperationHandler implements RMOperationHandlerActions {
operation.execute(this);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
index 6659cc9..e6d6c9d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
@@ -19,10 +19,19 @@
package org.apache.slider.server.appmaster.operations;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
public interface RMOperationHandlerActions {
void releaseAssignedContainer(ContainerId containerId);
void addContainerRequest(AMRMClient.ContainerRequest req);
+
+ /**
+ * Remove a container request
+ * @param priority1 priority to remove at
+ * @param priority2 second priority to target
+ * @param count number to remove
+ */
+ int cancelContainerRequests(Priority priority1, Priority priority2, int count);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 706b0d2..834eaf2 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -58,6 +59,7 @@ import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.operations.CancelRequestOperation;
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.slf4j.Logger;
@@ -948,7 +950,7 @@ public class AppState {
boolean active) {
List<RoleInstance> nodes = new ArrayList<RoleInstance>();
Collection<RoleInstance> allRoleInstances;
- allRoleInstances = active? ownedContainers.values() : liveNodes.values();
+ allRoleInstances = active ? ownedContainers.values() : liveNodes.values();
for (RoleInstance node : allRoleInstances) {
if (node.roleId == roleId) {
nodes.add(node);
@@ -1574,7 +1576,7 @@ public class AppState {
/**
* Get the failure threshold for a specific role, falling back to
* the global one if not
- * @param roleStatus
+ * @param roleStatus role
* @return the threshold for failures
*/
private int getFailureThresholdForRole(RoleStatus roleStatus) {
@@ -1615,11 +1617,10 @@ public class AppState {
String name = role.getName();
synchronized (role) {
delta = role.getDelta();
- details = role.toString();
expected = role.getDesired();
}
- log.info(details);
+ log.info("Reviewing {}", role);
checkFailureThreshold(role);
if (delta > 0) {
@@ -1630,13 +1631,11 @@ public class AppState {
Resource capability = recordFactory.newResource();
AMRMClient.ContainerRequest containerAsk =
buildContainerResourceAndRequest(role, capability);
- log.info("Container ask is {} and label = {}", containerAsk, containerAsk.getNodeLabelExpression());
- if (containerAsk.getCapability().getMemory() >
- this.containerMaxMemory) {
- log.warn(
- "Memory requested: " + containerAsk.getCapability().getMemory() +
- " > " +
- this.containerMaxMemory);
+ log.info("Container ask is {} and label = {}", containerAsk,
+ containerAsk.getNodeLabelExpression());
+ int askMemory = containerAsk.getCapability().getMemory();
+ if (askMemory > this.containerMaxMemory) {
+ log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
}
operations.add(new ContainerRequestOperation(containerAsk));
}
@@ -1649,47 +1648,73 @@ public class AppState {
//then pick some containers to kill
int excess = -delta;
- // get the nodes to release
- int roleId = role.getKey();
-
- // enum all active nodes that aren't being released
- List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true);
-
- // cut all release-in-progress nodes
- ListIterator<RoleInstance> li = containersToRelease.listIterator();
- while (li.hasNext()) {
- RoleInstance next = li.next();
- if (next.released) {
- li.remove();
+ // how many requests are outstanding
+ int outstandingRequests = role.getRequested();
+ if (outstandingRequests > 0) {
+ // outstanding requests.
+ int toCancel = Math.min(outstandingRequests, excess);
+ Priority p1 =
+ ContainerPriority.createPriority(role.getPriority(), true);
+ Priority p2 =
+ ContainerPriority.createPriority(role.getPriority(), false);
+ operations.add(new CancelRequestOperation(p1, p2, toCancel));
+ role.cancel(toCancel);
+ excess -= toCancel;
+ assert excess >= 0 : "Attempted to cancel too many requests";
+ log.info("Submitted {} cancellations, leaving {} to release",
+ toCancel, excess);
+ if (excess == 0) {
+ log.info("After cancelling requests, application is at desired size");
}
}
- // warn if the desired state can't be reaced
- if (containersToRelease.size() < excess) {
- log.warn("Not enough nodes to release...short of {} nodes",
- containersToRelease.size() - excess);
- }
-
- // ask the release selector to sort the targets
- containersToRelease = containerReleaseSelector.sortCandidates(
- roleId,
- containersToRelease,
- excess);
-
- //crop to the excess
- List<RoleInstance> finalCandidates = (excess < containersToRelease.size())
- ? containersToRelease.subList(0, excess)
- : containersToRelease;
-
+ // after the cancellation there may be no excess
+ if (excess > 0) {
+ // get the nodes to release
+ int roleId = role.getKey();
+
+ // enum all active nodes that aren't being released
+ List<RoleInstance> containersToRelease = enumNodesWithRoleId(roleId, true);
+
+ // cut all release-in-progress nodes
+ ListIterator<RoleInstance> li = containersToRelease.listIterator();
+ while (li.hasNext()) {
+ RoleInstance next = li.next();
+ if (next.released) {
+ li.remove();
+ }
+ }
- // then build up a release operation, logging each container as released
- for (RoleInstance possible : finalCandidates) {
- log.debug("Targeting for release: {}", possible);
- containerReleaseSubmitted(possible.container);
- operations.add(new ContainerReleaseOperation(possible.getId()));
+ // warn if the desired state can't be reaced
+ int numberAvailableForRelease = containersToRelease.size();
+ if (numberAvailableForRelease < excess) {
+ log.warn("Not enough nodes to release, have {} and need {} more",
+ numberAvailableForRelease,
+ excess - numberAvailableForRelease);
+ }
+
+ // ask the release selector to sort the targets
+ containersToRelease = containerReleaseSelector.sortCandidates(
+ roleId,
+ containersToRelease,
+ excess);
+
+ //crop to the excess
+
+ List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
+ ? containersToRelease.subList(0, excess)
+ : containersToRelease;
+
+
+ // then build up a release operation, logging each container as released
+ for (RoleInstance possible : finalCandidates) {
+ log.debug("Targeting for release: {}", possible);
+ containerReleaseSubmitted(possible.container);
+ operations.add(new ContainerReleaseOperation(possible.getId()));
+ }
}
-
+
}
return operations;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
index 369a932..3cc2106 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java
@@ -18,10 +18,13 @@
package org.apache.slider.server.appmaster.state;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records;
+import java.util.Locale;
+
/**
* Class containing the logic to build/split container priorities into the
* different fields used by Slider
@@ -61,23 +64,47 @@ public final class ContainerPriority {
}
/**
+ * Does the priority have location
+ * @param priority priority index
+ * @return true if the priority has the location marker
+ */
+ public static boolean hasLocation(int priority) {
+ return (priority ^ NOLOCATION ) != 0;
+ }
+
+ /**
* Map from a container to a role key by way of its priority
* @param container container
* @return role key
*/
public static int extractRole(Container container) {
Priority priority = container.getPriority();
- assert priority != null;
- return extractRole(priority.getPriority());
+ return extractRole(priority);
}
/**
- * Map from a container to a role key by way of its priority
- * @param container container
- * @return role key
+ * Priority record to role mapper
+ * @param priorityRecord priority record
+ * @return the role #
*/
public static int extractRole(Priority priorityRecord) {
+ Preconditions.checkNotNull(priorityRecord);
return extractRole(priorityRecord.getPriority());
}
+ /**
+ * Convert a priority record to a string, extracting role and locality
+ * @param priorityRecord priority record. May be null
+ * @return a string value
+ */
+ public static String toString(Priority priorityRecord) {
+ if (priorityRecord==null) {
+ return "(null)";
+ } else {
+ return String.format(Locale.ENGLISH,
+ "role %d (locality=%b)",
+ extractRole(priorityRecord),
+ hasLocation(priorityRecord.getPriority()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
index a112799..63b5931 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -32,10 +32,8 @@ import java.util.Map;
*/
public final class RoleStatus implements Cloneable {
-
private final String name;
-
/**
* Role key in the container details stored in the AM,
* currently mapped to priority
@@ -45,7 +43,7 @@ public final class RoleStatus implements Cloneable {
private final ProviderRole providerRole;
private int desired, actual, requested, releasing;
- private volatile int failed, started, startFailed, completed, totalRequested;
+ private int failed, started, startFailed, completed, totalRequested;
/**
* value to use when specifiying "no limit" for instances: {@value}
@@ -71,7 +69,7 @@ public final class RoleStatus implements Cloneable {
this.name = providerRole.name;
this.key = providerRole.id;
}
-
+
public String getName() {
return name;
}
@@ -101,26 +99,24 @@ public final class RoleStatus implements Cloneable {
return 0 != (getPlacementPolicy() & PlacementPolicy.NO_DATA_LOCALITY);
}
- public int getDesired() {
+ public synchronized int getDesired() {
return desired;
}
- public void setDesired(int desired) {
+ public synchronized void setDesired(int desired) {
this.desired = desired;
}
- public int getActual() {
+ public synchronized int getActual() {
return actual;
}
- public int incActual() {
+ public synchronized int incActual() {
return ++actual;
}
- public int decActual() {
- if (0 > --actual) {
- actual = 0;
- }
+ public synchronized int decActual() {
+ actual = Math.max(0, actual - 1);
return actual;
}
@@ -133,29 +129,29 @@ public final class RoleStatus implements Cloneable {
return ++requested;
}
- public synchronized int decRequested() {
- if (0 > --requested) {
- requested = 0;
- }
+ public synchronized int cancel(int count) {
+ requested = Math.max(0, requested - count);
return requested;
}
+
+ public synchronized int decRequested() {
+ return cancel(1);
+ }
- public int getReleasing() {
+ public synchronized int getReleasing() {
return releasing;
}
- public int incReleasing() {
+ public synchronized int incReleasing() {
return ++releasing;
}
- public int decReleasing() {
- if (0 > --releasing) {
- releasing = 0;
- }
+ public synchronized int decReleasing() {
+ releasing = Math.max(0, releasing - 1);
return releasing;
}
- public int getFailed() {
+ public synchronized int getFailed() {
return failed;
}
@@ -163,7 +159,7 @@ public final class RoleStatus implements Cloneable {
* Reset the failure counts
* @return the total number of failures up to this point
*/
- public int resetFailed() {
+ public synchronized int resetFailed() {
int total = failed + startFailed;
failed = 0;
startFailed = 0;
@@ -178,7 +174,7 @@ public final class RoleStatus implements Cloneable {
* @return the number of failures
* @param text text about the failure
*/
- public int noteFailed(boolean startupFailure, String text) {
+ public synchronized int noteFailed(boolean startupFailure, String text) {
int current = ++failed;
if (text != null) {
failureMessage = text;
@@ -190,38 +186,38 @@ public final class RoleStatus implements Cloneable {
return current;
}
- public int getStartFailed() {
+ public synchronized int getStartFailed() {
return startFailed;
}
- public void incStartFailed() {
+ public synchronized void incStartFailed() {
startFailed++;
}
- public String getFailureMessage() {
+ public synchronized String getFailureMessage() {
return failureMessage;
}
- public int getCompleted() {
+ public synchronized int getCompleted() {
return completed;
}
- public void setCompleted(int completed) {
+ public synchronized void setCompleted(int completed) {
this.completed = completed;
}
- public int incCompleted() {
+ public synchronized int incCompleted() {
return completed ++;
}
- public int getStarted() {
+ public synchronized int getStarted() {
return started;
}
- public void incStarted() {
+ public synchronized void incStarted() {
started++;
}
- public int getTotalRequested() {
+ public synchronized int getTotalRequested() {
return totalRequested;
}
@@ -233,9 +229,8 @@ public final class RoleStatus implements Cloneable {
* 0 means "do nothing".
*/
public synchronized int getDelta() {
- int inuse = actual + requested;
+ int inuse = getActualAndRequested();
//don't know how to view these. Are they in-use or not?
- //inuse += releasing;
int delta = desired - inuse;
if (delta < 0) {
//if we are releasing, remove the number that are already released.
@@ -246,8 +241,16 @@ public final class RoleStatus implements Cloneable {
return delta;
}
+ /**
+ * Get count of actual and requested containers
+ * @return the size of the application when outstanding requests are included
+ */
+ public synchronized int getActualAndRequested() {
+ return actual + requested;
+ }
+
@Override
- public String toString() {
+ public synchronized String toString() {
return "RoleStatus{" +
"name='" + name + '\'' +
", key=" + key +
@@ -267,13 +270,13 @@ public final class RoleStatus implements Cloneable {
}
@Override
- public Object clone() throws CloneNotSupportedException {
+ public synchronized Object clone() throws CloneNotSupportedException {
return super.clone();
}
/**
* Get the provider role
- * @return
+ * @return the provider role
*/
public ProviderRole getProviderRole() {
return providerRole;
@@ -283,7 +286,7 @@ public final class RoleStatus implements Cloneable {
* Build the statistics map from the current data
* @return a map for use in statistics reports
*/
- public Map<String, Integer> buildStatistics() {
+ public synchronized Map<String, Integer> buildStatistics() {
Map<String, Integer> stats = new HashMap<String, Integer>();
stats.put(StatusKeys.STATISTICS_CONTAINERS_ACTIVE_REQUESTS, getRequested());
stats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED, getCompleted());
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
index f8e852e..e5ad4ae 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
@@ -25,7 +25,9 @@ import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler
import org.apache.slider.server.appmaster.model.mock.MockRoles
+import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
+import org.apache.slider.server.appmaster.operations.CancelRequestOperation
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.operations.RMOperationHandler
@@ -63,32 +65,181 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
public void testMockAddOp() throws Throwable {
role0Status.desired = 1
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
- assert ops.size() == 1
+ assertListLength(ops, 1)
ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
int priority = operation.request.priority.priority
assert extractRole(priority) == MockFactory.PROVIDER_ROLE0.id
RMOperationHandler handler = new MockRMOperationHandler()
handler.execute(ops)
- //tell the container its been allocated
AbstractRMOperation op = handler.operations[0]
assert op instanceof ContainerRequestOperation
}
+ /**
+ * Test of a flex up and down op which verifies that outstanding
+ * requests are cancelled first.
+ * <ol>
+ * <li>request 5 nodes, assert 5 request made</li>
+ * <li>allocate 1 of them</li>
+ * <li>flex cluster size to 3</li>
+ * <li>assert this generates 2 cancel requests</li>
+ * </ol>
+ */
+ @Test
+ public void testRequestThenCancelOps() throws Throwable {
+ def role0 = role0Status
+ role0.desired = 5
+ List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 5)
+ // now 5 outstanding requests.
+ assert role0.requested == 5
+
+ // allocate one
+ role0.incActual()
+ role0.decRequested()
+ assert role0.requested == 4
+
+
+ // flex cluster to 3
+ role0.desired = 3
+ ops = appState.reviewRequestAndReleaseNodes()
+
+ // expect a cancel operation from review
+ assertListLength(ops, 1)
+ assert ops[0] instanceof CancelRequestOperation
+ RMOperationHandler handler = new MockRMOperationHandler()
+ handler.availableToCancel = 4;
+ handler.execute(ops)
+ assert handler.availableToCancel == 2
+ assert role0.requested == 2
+
+ // flex down one more
+ role0.desired = 2
+ ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 1)
+ assert ops[0] instanceof CancelRequestOperation
+ handler.execute(ops)
+ assert handler.availableToCancel == 1
+ assert role0.requested == 1
+
+ }
+
+ @Test
+ public void testCancelNoActualContainers() throws Throwable {
+ def role0 = role0Status
+ role0.desired = 5
+ List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 5)
+ // now 5 outstanding requests.
+ assert role0.requested == 5
+ role0.desired = 0
+ ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 1)
+ CancelRequestOperation cancel = ops[0] as CancelRequestOperation
+ assert cancel.count == 5
+ }
+
+
+ @Test
+ public void testFlexDownOutstandingRequests() throws Throwable {
+ // engine only has two nodes, so > 2 will be outstanding
+ engine = new MockYarnEngine(1, 2)
+ List<AbstractRMOperation> ops
+ // role: desired = 2, requested = 1, actual=1
+ def role0 = role0Status
+ role0.desired = 4
+ createAndSubmitNodes()
+
+ assert role0.requested == 2
+ assert role0.actual == 2
+ // there are now two outstanding, two actual
+ // Release 3 and verify that the two
+ // cancellations were combined with a release
+ role0.desired = 1;
+ assert role0.delta == -3
+ ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 2)
+ assert role0.requested == 0
+ assert role0.releasing == 1
+ }
+
+ @Test
+ public void testCancelAllOutstandingRequests() throws Throwable {
+
+ // role: desired = 2, requested = 1, actual=1
+ def role0 = role0Status
+ role0.desired = 2
+ role0.incRequested()
+ role0.incRequested()
+ List<AbstractRMOperation> ops
+
+ // there are now two outstanding, two actual
+ // Release 3 and verify that the two
+ // cancellations were combined with a release
+ role0.desired = 0;
+ ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 1)
+ CancelRequestOperation cancel = ops[0] as CancelRequestOperation
+ assert cancel.getCount() == 2
+ }
+
+
+ @Test
+ public void testFlexUpOutstandingRequests() throws Throwable {
+
+ // role: desired = 2, requested = 1, actual=1
+ def role0 = role0Status
+ role0.desired = 2
+ role0.incActual();
+ role0.incRequested()
+
+ List<AbstractRMOperation> ops
+
+ // flex up 2 nodes, yet expect only one node to be requested,
+ // as the outstanding request is taken into account
+ role0.desired = 4;
+ role0.incRequested()
+
+ assert role0.actual == 1;
+ assert role0.requested == 2;
+ assert role0.actualAndRequested == 3;
+ assert role0.delta == 1
+ ops = appState.reviewRequestAndReleaseNodes()
+ assertListLength(ops, 1)
+ assert ops[0] instanceof ContainerRequestOperation
+ assert role0.requested == 3
+ }
+
+ @Test
+ public void testFlexUpNoSpace() throws Throwable {
+ // engine only has two nodes, so > 2 will be outstanding
+ engine = new MockYarnEngine(1, 2)
+ List<AbstractRMOperation> ops
+ // role: desired = 2, requested = 1, actual=1
+ def role0 = role0Status
+ role0.desired = 4
+ createAndSubmitNodes()
+
+ assert role0.requested == 2
+ assert role0.actual == 2
+ role0.desired = 8;
+ assert role0.delta == 4
+ createAndSubmitNodes()
+ assert role0.requested == 6
+ }
+
+
@Test
public void testAllocateReleaseOp() throws Throwable {
role0Status.desired = 1
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
- AMRMClient.ContainerRequest request = operation.request
- Container cont = engine.allocateContainer(request)
- List<Container> allocated = [cont]
- List<ContainerAssignment> assignments = [];
- List<AbstractRMOperation> operations = []
- appState.onContainersAllocated(allocated, assignments, operations)
- assert operations.size() == 0
- assert assignments.size() == 1
+ def (ArrayList<ContainerAssignment> assignments, Container cont, AMRMClient.ContainerRequest request) = satisfyContainerRequest(
+ operation)
+ assertListLength(ops, 1)
+ assertListLength(assignments, 1)
ContainerAssignment assigned = assignments[0]
Container target = assigned.container
assert target.id == cont.id
@@ -104,13 +255,23 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
//now release it by changing the role status
role0Status.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
- assert ops.size() == 1
+ assertListLength(ops, 1)
assert ops[0] instanceof ContainerReleaseOperation
ContainerReleaseOperation release = (ContainerReleaseOperation) ops[0]
assert release.containerId == cont.id
}
+ public List satisfyContainerRequest(ContainerRequestOperation operation) {
+ AMRMClient.ContainerRequest request = operation.request
+ Container cont = engine.allocateContainer(request)
+ List<Container> allocated = [cont]
+ List<ContainerAssignment> assignments = [];
+ List<AbstractRMOperation> operations = []
+ appState.onContainersAllocated(allocated, assignments, operations)
+ return [assignments, cont, request]
+ }
+
@Test
public void testComplexAllocation() throws Throwable {
role0Status.desired = 1
@@ -121,8 +282,8 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> releases = []
appState.onContainersAllocated(allocations, assignments, releases)
- assert releases.size() == 0
- assert assignments.size() == 4
+ assertListLength(releases, 0)
+ assertListLength(assignments, 4)
assignments.each { ContainerAssignment assigned ->
Container target = assigned.container
RoleInstance ri = roleInstance(assigned)
@@ -136,7 +297,7 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
assert engine.containerCount() == 4;
role1Status.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
- assert ops.size() == 3
+ assertListLength(ops, 3)
allocations = engine.execute(ops)
assert engine.containerCount() == 1;
@@ -154,7 +315,7 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> releases = []
appState.onContainersAllocated(allocations, assignments, releases)
- assert assignments.size() == 1
+ assertListLength(assignments, 1)
ContainerAssignment assigned = assignments[0]
Container target = assigned.container
RoleInstance ri = roleInstance(assigned)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index 675aec5..fc23d54 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.LifecycleEvent
import org.apache.hadoop.service.ServiceStateChangeListener
import org.apache.hadoop.yarn.api.records.Container
import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.Priority
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.registry.client.types.ServiceRecord
import org.apache.slider.api.ClusterDescription
@@ -269,6 +270,14 @@ class MockProviderService implements ProviderService {
}
@Override
+ int cancelContainerRequests(
+ Priority priority1,
+ Priority priority2,
+ int count) {
+ return 0
+ }
+
+ @Override
void rebuildContainerDetails(List<Container> liveContainers, String applicationId,
Map<Integer, ProviderRole> roleProviderMap) {
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
index 0fdba6b..297c597 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
@@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.model.mock
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.Priority
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
@@ -30,6 +31,10 @@ import org.apache.slider.server.appmaster.operations.RMOperationHandler
class MockRMOperationHandler extends RMOperationHandler {
public List<AbstractRMOperation> operations = [];
int requests, releases;
+ // number available to cancel
+ int availableToCancel = 0;
+ // count of cancelled values. This must be explicitly set
+ int cancelled
@Override
public void releaseAssignedContainer(ContainerId containerId) {
@@ -45,9 +50,20 @@ class MockRMOperationHandler extends RMOperationHandler {
requests++;
}
- /**
- * clear the history
- */
+ @Override
+ int cancelContainerRequests(
+ Priority priority1,
+ Priority priority2,
+ int count) {
+ int releaseable = Math.min(count, availableToCancel)
+ availableToCancel -= releaseable;
+ cancelled += releaseable;
+ return releaseable;
+ }
+
+/**
+ * clear the history
+ */
public void clear() {
operations.clear()
releases = 0;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/a2c26612/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
index 61dfeb5..42ec9de 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
@@ -85,19 +85,46 @@ class SliderTestUtils extends Assert {
public static void assume(boolean condition, String message) {
if (!condition) {
- log.warn("Skipping test: {}", message)
- Assume.assumeTrue(message, false);
+ skip(message)
}
}
-
+ /**
+ * Equality size for a list
+ * @param left
+ * @param right
+ */
public static void assertListEquals(List left, List right) {
- assert left.size() == right.size();
+ String lval = collectionToString(left)
+ String rval = collectionToString(right)
+ String text = "comparing $lval to $rval"
+ assertEquals(text, left.size(), right.size())
for (int i = 0; i < left.size(); i++) {
- assert left[0] == right[0]
+ assertEquals(text, left[i], right[i])
}
}
+
+ /**
+ * Assert a list has a given length
+ * @param list list
+ * @param size size to have
+ */
+ public static void assertListLength(List list, int size) {
+ String lval = collectionToString(list)
+ assertEquals(lval, size, list.size())
+ }
+
+
+ /**
+ * Stringify a collection with [ ] at either end
+ * @param collection collection
+ * @return string value
+ */
+ public static String collectionToString(List collection) {
+ return "[" + SliderUtils.join(collection,", ", false) +"]"
+ }
+
/**
* Assume that a string option is set and not equal to ""
* @param conf configuration file