You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/25 21:28:49 UTC
[20/25] incubator-slider git commit: SLIDER-799 SLIDER-817 request
tracker builds cancel operation from the resource used in the request...tests
updated to handle the changes
SLIDER-799 SLIDER-817 request tracker builds cancel operation from the resource used in the request...tests updated to handle the changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b952b640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b952b640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b952b640
Branch: refs/heads/develop
Commit: b952b6401a22c65053c85a6a1238f1928d3eb243
Parents: 43c61fb
Author: Steve Loughran <st...@apache.org>
Authored: Tue Mar 24 19:33:36 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Mar 24 19:33:36 2015 +0000
----------------------------------------------------------------------
.../operations/CancelRequestOperation.java | 68 ---------------
.../slider/server/appmaster/state/AppState.java | 48 +++++++----
.../state/ContainerAllocationOutcome.java | 15 ++++
.../appmaster/state/OutstandingRequest.java | 12 ++-
.../state/OutstandingRequestTracker.java | 87 ++++++++++++++++----
.../server/appmaster/state/RoleHistory.java | 30 ++++++-
.../appstate/TestMockAppStateFlexing.groovy | 20 ++++-
.../TestMockAppStateRMOperations.groovy | 62 ++++++++------
.../TestMockAppStateRolePlacement.groovy | 14 +++-
...tRoleHistoryOutstandingRequestTracker.groovy | 12 +--
.../TestRoleHistoryRequestTracking.groovy | 8 +-
.../appmaster/model/mock/Allocator.groovy | 1 +
.../appmaster/model/mock/MockYarnEngine.groovy | 2 +-
13 files changed, 236 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
deleted file mode 100644
index 754bf28..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.appmaster.operations;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.slider.server.appmaster.state.ContainerPriority;
-
-/**
- * Cancel a container request at the given priority/proirities.
- */
-public class CancelRequestOperation extends AbstractRMOperation {
-
- private final Priority priority1;
- private final Priority priority2;
- private final int count;
-
- /**
- * Create an instance
- * @param priority1 first priority, the one that is released first
- * @param priority2 optional second priority
- * @param count number of requests to cancel
- */
- public CancelRequestOperation(Priority priority1, Priority priority2, int count) {
- Preconditions.checkArgument(priority1 != null, "null priority");
- Preconditions.checkArgument(count >= 0, "negative count");
- this.priority1 = priority1;
- this.priority2 = priority2;
- this.count = count;
- }
-
- @Override
- public void execute(RMOperationHandlerActions handler) {
- handler.cancelContainerRequests(priority1, priority2, count);
- }
-
- @Override
- public String toString() {
- return "release " + count
- + " requests for " + ContainerPriority.toString(priority1)
- + (priority2 != null ?
- (" and " + ContainerPriority.toString(priority2)) : "");
- }
-
- /**
- * Get the number to release
- * @return the number of containers to release
- */
- public int getCount() {
- return count;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 20c1792..c68b2a9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -68,7 +67,6 @@ import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
import org.apache.slider.server.appmaster.management.MetricsConstants;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
-import org.apache.slider.server.appmaster.operations.CancelRequestOperation;
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.slf4j.Logger;
@@ -1941,25 +1939,33 @@ public class AppState {
if (outstandingRequests > 0) {
// outstanding requests.
int toCancel = Math.min(outstandingRequests, excess);
- Priority p1 =
- ContainerPriority.createPriority(role.getPriority(), true);
- Priority p2 =
- ContainerPriority.createPriority(role.getPriority(), false);
- // TODO Delegate to Role History
- operations.add(new CancelRequestOperation(p1, p2, toCancel));
+
+ // Delegate to Role History
+
+ List<AbstractRMOperation> cancellations = roleHistory.cancelRequestsForRole(role, toCancel);
+ log.info("Found {} outstanding requests to cancel", cancellations.size());
+ operations.addAll(cancellations);
+ if (toCancel != cancellations.size()) {
+ log.error("Tracking of outstanding requests is not in sync with the summary statistics:" +
+ " expected to be able to cancel {} requests, but got {}",
+ toCancel, cancellations.size());
+ }
+
role.cancel(toCancel);
excess -= toCancel;
assert excess >= 0 : "Attempted to cancel too many requests";
log.info("Submitted {} cancellations, leaving {} to release",
toCancel, excess);
if (excess == 0) {
- log.info("After cancelling requests, application is at desired size");
+ log.info("After cancelling requests, application is now at desired size");
}
}
// after the cancellation there may be no excess
if (excess > 0) {
+
+ // there's an excess, so more to cancel
// get the nodes to release
int roleId = role.getKey();
@@ -1978,7 +1984,7 @@ public class AppState {
}
}
- // warn if the desired state can't be reaced
+ // warn if the desired state can't be reached
int numberAvailableForRelease = containersToRelease.size();
if (numberAvailableForRelease < excess) {
log.warn("Not enough containers to release, have {} and need {} more",
@@ -2001,7 +2007,7 @@ public class AppState {
// then build up a release operation, logging each container as released
for (RoleInstance possible : finalCandidates) {
- log.debug("Targeting for release: {}", possible);
+ log.info("Targeting for release: {}", possible);
containerReleaseSubmitted(possible.container);
operations.add(new ContainerReleaseOperation(possible.getId()));
}
@@ -2009,6 +2015,7 @@ public class AppState {
}
+ // list of operations to execute
return operations;
}
@@ -2111,18 +2118,27 @@ public class AppState {
//dec requested count
decrementRequestCount(role);
- // cancel an allocation request which granted this, so as to avoid repeated
- // requests
- releaseOperations.add(new CancelRequestOperation(container.getPriority(), null, 1));
-
//inc allocated count -this may need to be dropped in a moment,
// but us needed to update the logic below
final int allocated = role.incActual();
final int desired = role.getDesired();
final String roleName = role.getName();
- final ContainerAllocationOutcome outcome =
+ final ContainerAllocation allocation =
roleHistory.onContainerAllocated(container, desired, allocated);
+ final ContainerAllocationOutcome outcome = allocation.outcome;
+
+ // cancel an allocation request which granted this, so as to avoid repeated
+ // requests
+ if (allocation.origin != null && allocation.origin.getIssuedRequest() != null) {
+ releaseOperations.add(allocation.origin.createCancelOperation());
+ } else {
+ // there's a request, but no idea what to cancel.
+ // rather than try to recover from it inelegantly, (and cause more confusion),
+ // log the event, but otherwise continue
+ log.warn("Unexpected allocation of container "
+ + SliderUtils.containerToString(container));
+ }
//look for condition where we get more back than we asked
if (allocated > desired) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
index 6639300..5b3a93c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java
@@ -22,8 +22,23 @@ package org.apache.slider.server.appmaster.state;
* Outcome of the assignment
*/
public enum ContainerAllocationOutcome {
+ /**
+ * There wasn't a request for this
+ */
Unallocated,
+
+ /**
+ * Open placement
+ */
Open,
+
+ /**
+ * Allocated explicitly where requested
+ */
Placed,
+
+ /**
+ * This was an escalated placement
+ */
Escalated
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index 24946af..12b4b53 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -244,7 +245,7 @@ public final class OutstandingRequest {
* so as to place it into the relaxed list.
*/
public synchronized AMRMClient.ContainerRequest escalate() {
- Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued "+ this);
+ Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this);
escalated = true;
// this is now the priority
@@ -352,4 +353,13 @@ public final class OutstandingRequest {
sb.append('}');
return sb.toString();
}
+
+ /**
+ * Create a cancel operation
+ * @return an operation that can be used to cancel the request
+ */
+ public CancelSingleRequest createCancelOperation() {
+ Preconditions.checkState(issuedRequest!=null, "No issued request to cancel");
+ return new CancelSingleRequest(issuedRequest);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index 05a8052..97d321c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -119,38 +119,44 @@ public class OutstandingRequestTracker {
* from the {@link #placedRequests} structure.
* @param role role index
* @param hostname hostname
- * @param resource
* @return the allocation outcome
*/
- public synchronized ContainerAllocationOutcome onContainerAllocated(int role,
+ public synchronized ContainerAllocation onContainerAllocated(int role,
String hostname,
Container container) {
+ final String containerDetails = SliderUtils.containerToString(container);
+ log.debug("Processing allocation for role {} on {}", role,
+ containerDetails);
+ ContainerAllocation allocation = new ContainerAllocation();
ContainerAllocationOutcome outcome;
OutstandingRequest request =
- placedRequests.remove(new OutstandingRequest(role, hostname));
+ placedRequests.remove(new OutstandingRequest(role, hostname));
if (request != null) {
//satisfied request
- log.info("Found placed request for container: {}", request);
+ log.debug("Found placed request for container: {}", request);
request.completed();
// derive outcome from status of tracked request
outcome = request.isEscalated()
- ? ContainerAllocationOutcome.Escalated
- : ContainerAllocationOutcome.Placed;
+ ? ContainerAllocationOutcome.Escalated
+ : ContainerAllocationOutcome.Placed;
} else {
// not in the list; this is an open placement
// scan through all containers in the open request list
request = removeOpenRequest(container);
if (request != null) {
- log.info("Found open request for container: {}", request);
+ log.debug("Found open request for container: {}", request);
request.completed();
outcome = ContainerAllocationOutcome.Open;
} else {
- log.warn("Container allocation was not expected :"
- + SliderUtils.containerToString(container));
+ log.warn("No open request found for container {}, outstanding queue has {} entries ",
+ containerDetails,
+ openRequests.size());
outcome = ContainerAllocationOutcome.Unallocated;
}
}
- return outcome;
+ allocation.origin = request;
+ allocation.outcome = outcome;
+ return allocation;
}
/**
@@ -167,11 +173,15 @@ public class OutstandingRequestTracker {
ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
while (openlist.hasNext() && request == null) {
OutstandingRequest r = openlist.next();
- if (r.getPriority() == pri
- && r.resourceRequirementsMatch(resource)) {
- // match of priority and resources
- request = r;
- openlist.remove();
+ if (r.getPriority() == pri) {
+ // matching resource
+ if (r.resourceRequirementsMatch(resource)) {
+ // match of priority and resources
+ request = r;
+ openlist.remove();
+ } else {
+ log.debug("Matched priorities but resources different");
+ }
}
}
return request;
@@ -314,6 +324,7 @@ public class OutstandingRequestTracker {
* Escalate operation as triggered by external timer.
* @return a (usually empty) list of cancel/request operations.
*/
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) {
if (placedRequests.isEmpty()) {
return NO_REQUESTS;
@@ -327,7 +338,7 @@ public class OutstandingRequestTracker {
if (outstandingRequest.shouldEscalate(now)) {
// time to escalate
- CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.getIssuedRequest());
+ CancelSingleRequest cancel = outstandingRequest.createCancelOperation();
operations.add(cancel);
AMRMClient.ContainerRequest escalated =
outstandingRequest.escalate();
@@ -338,4 +349,48 @@ public class OutstandingRequestTracker {
}
return operations;
}
+
+ /**
+ * Extract a specific number of open requests for a role
+ * @param roleId role Id
+ * @param count count to extract
+ * @return a list of requests which are no longer in the open request list
+ */
+ public synchronized List<OutstandingRequest> extractOpenRequestsForRole(int roleId, int count) {
+ List<OutstandingRequest> results = new ArrayList<>();
+ ListIterator<OutstandingRequest> openlist = openRequests.listIterator();
+ while (openlist.hasNext() && count > 0) {
+ OutstandingRequest openRequest = openlist.next();
+ if (openRequest.roleId == roleId) {
+ results.add(openRequest);
+ openlist.remove();
+ count--;
+ }
+ }
+ return results;
+ }
+ /**
+ * Extract a specific number of placed requests for a role
+ * @param roleId role Id
+ * @param count count to extract
+ * @return a list of requests which are no longer in the placed request data structure
+ */
+ public synchronized List<OutstandingRequest> extractPlacedRequestsForRole(int roleId, int count) {
+ List<OutstandingRequest> results = new ArrayList<>();
+ Iterator<OutstandingRequest> iterator = placedRequests.keySet().iterator();
+ while (iterator.hasNext() && count > 0) {
+ OutstandingRequest request = iterator.next();
+ if (request.roleId == roleId) {
+ results.add(request);
+ count--;
+ }
+ }
+ // now cull them from the map
+ for (OutstandingRequest result : results) {
+ placedRequests.remove(result);
+ }
+
+ return results;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 0b981b8..9ab40bd 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -638,13 +638,13 @@ public class RoleHistory {
* @param actualCount current count of instances
* @return The allocation outcome
*/
- public synchronized ContainerAllocationOutcome onContainerAllocated(Container container,
+ public synchronized ContainerAllocation onContainerAllocated(Container container,
int desiredCount,
int actualCount) {
int role = ContainerPriority.extractRole(container);
String hostname = RoleHistoryUtils.hostnameOf(container);
List<NodeInstance> nodeInstances = getOrCreateNodesForRoleId(role);
- ContainerAllocationOutcome outcome =
+ ContainerAllocation outcome =
outstandingRequests.onContainerAllocated(role, hostname, container);
if (desiredCount <= actualCount) {
// all outstanding requests have been satisfied
@@ -874,4 +874,30 @@ public class RoleHistory {
long now = now();
return outstandingRequests.escalateOutstandingRequests(now);
}
+
+ /**
+ * Build the list of requests to cancel from the outstanding list.
+ * @param role
+ * @param toCancel
+ * @return a list of cancellable operations.
+ */
+ public synchronized List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) {
+ List<AbstractRMOperation> results = new ArrayList<>(toCancel);
+ // first scan through the unplaced request list to find all of a role
+ int roleId = role.getKey();
+ List<OutstandingRequest> requests =
+ outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
+
+ // are there any left?
+ int remaining = toCancel - requests.size();
+ // ask for some placed nodes
+ requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining));
+
+ // build cancellations
+ for (OutstandingRequest request : requests) {
+ results.add(request.createCancelOperation());
+ }
+ return results;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy
index d962438..257092a 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy
@@ -25,6 +25,7 @@ import org.apache.slider.core.exceptions.TriggerClusterTeardownException
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockRoles
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest
import org.apache.slider.server.appmaster.state.AppState
import org.apache.slider.server.appmaster.state.ContainerAssignment
import org.apache.slider.server.appmaster.state.RoleInstance
@@ -163,6 +164,23 @@ class TestMockAppStateFlexing extends BaseMockAppStateTest implements MockRoles
}
}
-
+
+ @Test
+ public void testCancelWithRequestsOutstanding() throws Throwable {
+ // flex cluster size before the original set were allocated
+
+
+ role0Status.desired = 6
+ // build the ops
+ List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
+ // here the data structures exist
+
+ // go down
+ role0Status.desired = 3
+ List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes()
+ assert ops2.size() == 3
+ ops2.each { assert it instanceof CancelSingleRequest}
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
index ee5eead..9ac6fcf 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy
@@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.model.appstate
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.Container
+import org.apache.hadoop.yarn.api.records.ContainerId
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
@@ -27,10 +28,11 @@ import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler
import org.apache.slider.server.appmaster.model.mock.MockRoles
import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
-import org.apache.slider.server.appmaster.operations.CancelRequestOperation
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.operations.RMOperationHandler
+import org.apache.slider.server.appmaster.state.AppState
import org.apache.slider.server.appmaster.state.ContainerAssignment
import org.apache.slider.server.appmaster.state.RoleInstance
import org.junit.Test
@@ -95,10 +97,17 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
assertListLength(ops, 5)
// now 5 outstanding requests.
assert role0.requested == 5
-
+
// allocate one
- role0.incActual()
- role0.decRequested()
+ List<AbstractRMOperation> processed = [ops[0]]
+ List<ContainerId> released = []
+ List<AppState.NodeCompletionResult> completionResults = []
+ submitOperations(processed, released)
+ List<RoleInstance> instances = createAndSubmitNodes(released)
+ processSubmissionOperations(instances, completionResults, released)
+
+
+ // four outstanding
assert role0.requested == 4
@@ -106,9 +115,10 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
role0.desired = 3
ops = appState.reviewRequestAndReleaseNodes()
- // expect a cancel operation from review
- assertListLength(ops, 1)
- assert ops[0] instanceof CancelRequestOperation
+ // expect two cancel operation from review
+ assertListLength(ops, 2)
+ ops.each { assert it instanceof CancelSingleRequest }
+
RMOperationHandler handler = new MockRMOperationHandler()
handler.availableToCancel = 4;
handler.execute(ops)
@@ -119,11 +129,10 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
role0.desired = 2
ops = appState.reviewRequestAndReleaseNodes()
assertListLength(ops, 1)
- assert ops[0] instanceof CancelRequestOperation
+ ops.each { assert it instanceof CancelSingleRequest }
handler.execute(ops)
assert handler.availableToCancel == 1
assert role0.requested == 1
-
}
@Test
@@ -136,9 +145,8 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
assert role0.requested == 5
role0.desired = 0
ops = appState.reviewRequestAndReleaseNodes()
- assertListLength(ops, 1)
- CancelRequestOperation cancel = ops[0] as CancelRequestOperation
- assert cancel.count == 5
+ assertListLength(ops, 5)
+
}
@@ -160,7 +168,9 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
role0.desired = 1;
assert role0.delta == -3
ops = appState.reviewRequestAndReleaseNodes()
- assertListLength(ops, 2)
+ assertListLength(ops, 3)
+ assert 2 == (ops.findAll {it instanceof CancelSingleRequest}).size()
+ assert 1 == (ops.findAll {it instanceof ContainerReleaseOperation}).size()
assert role0.requested == 0
assert role0.releasing == 1
}
@@ -171,31 +181,31 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
// role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 2
- role0.incRequested()
- role0.incRequested()
List<AbstractRMOperation> ops
-
- // there are now two outstanding, two actual
+ ops = appState.reviewRequestAndReleaseNodes()
+ assert 2 == (ops.findAll { it instanceof ContainerRequestOperation }).size()
+
+ // there are now two outstanding, two actual
// Release 3 and verify that the two
// cancellations were combined with a release
role0.desired = 0;
ops = appState.reviewRequestAndReleaseNodes()
- assertListLength(ops, 1)
- CancelRequestOperation cancel = ops[0] as CancelRequestOperation
- assert cancel.getCount() == 2
+ assert ops.size() == 2
+ assert 2 == (ops.findAll { it instanceof CancelSingleRequest }).size()
}
@Test
public void testFlexUpOutstandingRequests() throws Throwable {
-
- // role: desired = 2, requested = 1, actual=1
+
+ List<AbstractRMOperation> ops
+ // role: desired = 2, requested = 1, actual=1
def role0 = role0Status
role0.desired = 2
role0.incActual();
role0.incRequested()
-
- List<AbstractRMOperation> ops
+
+
// flex up 2 nodes, yet expect only one node to be requested,
// as the outstanding request is taken into account
@@ -283,7 +293,9 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR
List<ContainerAssignment> assignments = [];
List<AbstractRMOperation> releases = []
appState.onContainersAllocated(allocations, assignments, releases)
- assertListLength(releases, 0)
+ // we expect four release requests here for all the allocated containers
+ assertListLength(releases, 4)
+ releases.each { assert it instanceof CancelSingleRequest }
assertListLength(assignments, 4)
assignments.each { ContainerAssignment assigned ->
Container target = assigned.container
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy
index 8fd9858..4726e71 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockRoles
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
+import org.apache.slider.server.appmaster.operations.CancelSingleRequest
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.state.ContainerAssignment
@@ -59,12 +60,19 @@ class TestMockAppStateRolePlacement extends BaseMockAppStateTest
assert request.relaxLocality
assert request.nodes == null
assert request.racks == null
+ assert request.capability
Container allocated = engine.allocateContainer(request)
List<ContainerAssignment> assignments = [];
- List<AbstractRMOperation> operations = []
- appState.onContainersAllocated([(Container)allocated], assignments, operations)
- assert operations.size() == 0
+ List<AbstractRMOperation> releaseOperations = []
+ appState.onContainersAllocated([(Container)allocated], assignments, releaseOperations)
+ // verify the release matches the allocation
+ assert releaseOperations.size() == 1
+ CancelSingleRequest cancelOp = releaseOperations[0] as CancelSingleRequest;
+ assert cancelOp.request
+ assert cancelOp.request.capability
+ assert cancelOp.request.capability.equals(allocated.resource)
+ // now the assignment
assert assignments.size() == 1
ContainerAssignment assigned = assignments[0]
Container container = assigned.container
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 3f41dfd..3d396f8 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -23,8 +23,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.providers.PlacementPolicy
import org.apache.slider.providers.ProviderRole
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
-import org.apache.slider.server.appmaster.model.mock.MockContainer
-import org.apache.slider.server.appmaster.model.mock.MockNodeId
import org.apache.slider.server.appmaster.model.mock.MockPriority
import org.apache.slider.server.appmaster.model.mock.MockResource
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
@@ -63,7 +61,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
tracker.newRequest(host1, 0)
tracker.newRequest(host2, 0)
tracker.newRequest(host1, 1)
- assert tracker.onContainerAllocated(1, "host1", null) == ContainerAllocationOutcome.Placed
+ assert tracker.onContainerAllocated(1, "host1", null).outcome == ContainerAllocationOutcome.Placed
assert !tracker.lookupPlacedRequest(1, "host1")
assert tracker.lookupPlacedRequest(0, "host1")
}
@@ -91,7 +89,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
resource.virtualCores=1
resource.memory = 48;
c1.setResource(resource)
- ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, "host1", c1)
+ ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, "host1", c1).outcome
assert outcome == ContainerAllocationOutcome.Unallocated
assert tracker.listOpenRequests().size() == 1
}
@@ -120,9 +118,11 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
assert issued.capability == resource
assert issued.priority.priority == c1.getPriority().getPriority()
assert req1.resourceRequirementsMatch(resource)
- ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, nodeId.host, c1)
+
+ def allocation = tracker.onContainerAllocated(0, nodeId.host, c1)
assert tracker.listOpenRequests().size() == 0
- assert outcome == ContainerAllocationOutcome.Open
+ assert allocation.outcome == ContainerAllocationOutcome.Open
+ assert allocation.origin.is(req1)
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
index 34a0a4d..82750a3 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy
@@ -143,11 +143,11 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
}
public void assertOnContainerAllocated(Container c1, int p1, int p2) {
- assert ContainerAllocationOutcome.Open != roleHistory.onContainerAllocated(c1, p1, p2)
+ assert ContainerAllocationOutcome.Open != roleHistory.onContainerAllocated(c1, p1, p2).outcome
}
public void assertOnContainerAllocationOpen(Container c1, int p1, int p2) {
- assert ContainerAllocationOutcome.Open == roleHistory.onContainerAllocated(c1, p1, p2)
+ assert ContainerAllocationOutcome.Open == roleHistory.onContainerAllocated(c1, p1, p2).outcome
}
def assertNoOutstandingPlacedRequests() {
@@ -190,7 +190,8 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
// the final allocation will trigger a cleanup
container = factory.newContainer(req2, "four")
// no node dropped
- assert ContainerAllocationOutcome.Unallocated == roleHistory.onContainerAllocated(container, 3, 3)
+ assert ContainerAllocationOutcome.Unallocated ==
+ roleHistory.onContainerAllocated(container, 3, 3).outcome
// yet the list is now empty
assertNoOutstandingPlacedRequests()
roleHistory.listOpenRequests().empty
@@ -198,7 +199,6 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest {
// and the remainder goes onto the available list
List<NodeInstance> a2 = roleHistory.cloneAvailableList(0)
assertListEquals([age2Active0], a2)
-
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy
index a027098..ca5d805 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy
@@ -114,6 +114,7 @@ class Allocator {
container.nodeId = node.nodeId
container.nodeHttpAddress = node.httpAddress()
container.priority = request.priority
+ container.resource = request.capability
return container;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy
index 04466c6..e3d509a 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy
@@ -114,8 +114,8 @@ class MockYarnEngine {
} else {
ContainerRequestOperation req = (ContainerRequestOperation) op
Container container = allocateContainer(req.request)
- log.info("allocated container $container for $req")
if (container != null) {
+ log.info("allocated container $container for $req")
allocation.add(container)
} else {
log.debug("Unsatisfied allocation $req")