You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/11/23 18:20:28 UTC
[16/50] incubator-slider git commit: SLIDER-967 AA placement with
nodemap updates working
SLIDER-967 AA placement with nodemap updates working
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5a61b4cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5a61b4cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5a61b4cd
Branch: refs/heads/develop
Commit: 5a61b4cd8189ae02eb9eaeb8ffdb25604dcc4376
Parents: 6b13042
Author: Steve Loughran <st...@apache.org>
Authored: Thu Nov 12 18:15:07 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Nov 12 18:15:07 2015 +0000
----------------------------------------------------------------------
.../server/appmaster/SliderAppMaster.java | 13 ++-
.../slider/server/appmaster/state/AppState.java | 51 ++++++++++--
.../state/OutstandingRequestTracker.java | 8 +-
.../server/appmaster/state/RoleHistory.java | 5 --
.../server/appmaster/state/RoleStatus.java | 55 ++++++++-----
.../appstate/TestMockAppStateAAPlacement.groovy | 85 +++++++++++++++++---
.../model/history/TestRoleHistoryAA.groovy | 4 -
.../model/mock/BaseMockAppStateTest.groovy | 21 ++++-
8 files changed, 183 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index b54ea6c..eb7b26a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1851,10 +1851,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
log.info("Updated nodes {}", updatedNodes);
// Check if any nodes are lost or revived and update state accordingly
- List<AbstractRMOperation> operations = appState.onNodesUpdated(updatedNodes);
- execute(operations);
- // if there were any operations, trigger a review
- reviewRequestAndReleaseNodes("nodes updated");
+
+ AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes);
+ if (!outcome.operations.isEmpty()) {
+ execute(outcome.operations);
+ }
+ // rigger a review if the cluster changed
+ if (outcome.clusterChanged) {
+ reviewRequestAndReleaseNodes("nodes updated");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 0c66e25..6f38eb5 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -1222,11 +1222,11 @@ public class AppState {
* @return the container request to submit or null if there is none
*/
private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) {
- incrementRequestCount(role);
OutstandingRequest request = roleHistory.requestContainerForRole(role);
if (request == null) {
return null;
}
+ incrementRequestCount(role);
if (role.isAntiAffinePlacement()) {
role.setOutstandingAArequest(request);
}
@@ -1428,16 +1428,31 @@ public class AppState {
* Handle node update from the RM. This syncs up the node map with the RM's view
* @param updatedNodes updated nodes
*/
- public synchronized List<AbstractRMOperation> onNodesUpdated(List<NodeReport> updatedNodes) {
+ public synchronized NodeUpdatedOutcome onNodesUpdated(List<NodeReport> updatedNodes) {
boolean changed = roleHistory.onNodesUpdated(updatedNodes);
if (changed) {
- log.error("TODO: cancel AA requests and re-review");
- return cancelOutstandingAARequests();
+ log.info("YARN cluster changed —cancelling current AA requests");
+ List<AbstractRMOperation> operations = cancelOutstandingAARequests();
+ log.debug("Created {} cancel requests", operations.size());
+ return new NodeUpdatedOutcome(true, operations);
}
- return new ArrayList<>(0);
+ return new NodeUpdatedOutcome(false, new ArrayList<AbstractRMOperation>(0));
}
/**
+ * Return value of the {@link #onNodesUpdated(List)} call.
+ */
+ public static class NodeUpdatedOutcome {
+ public final boolean clusterChanged;
+ public final List<AbstractRMOperation> operations;
+
+ public NodeUpdatedOutcome(boolean clusterChanged,
+ List<AbstractRMOperation> operations) {
+ this.clusterChanged = clusterChanged;
+ this.operations = operations;
+ }
+ }
+ /**
* Is a role short lived by the threshold set for this application
* @param instance instance
* @return true if the instance is considered short lived
@@ -1885,13 +1900,17 @@ public class AppState {
}
/**
- * Escalate operation as triggered by external timer.
+ * Cancel any outstanding AA Requests, building up the list of ops to
+ * cancel, removing them from RoleHistory structures and the RoleStatus
+ * entries.
* @return a (usually empty) list of cancel/request operations.
*/
public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+ // get the list of cancel operations
List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests();
for (RoleStatus roleStatus : roleStatusMap.values()) {
- if (roleStatus.isAntiAffinePlacement()) {
+ if (roleStatus.isAARequestOutstanding()) {
+ log.info("Cancelling outstanding AA request for {}", roleStatus);
roleStatus.cancelOutstandingAARequest();
}
}
@@ -2225,6 +2244,9 @@ public class AppState {
log.info("Asking for next container for AA role {}", roleName);
role.decPendingAntiAffineRequests();
addContainerRequest(operations, createContainerRequest(role));
+ log.debug("Current AA role status {}", role);
+ } else {
+ log.info("AA request sequence completed for role {}", role);
}
}
@@ -2310,4 +2332,19 @@ public class AppState {
// now pretend it has just started
innerOnNodeManagerContainerStarted(cid);
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("AppState{");
+ sb.append("applicationLive=").append(applicationLive);
+ sb.append(", live nodes=").append(liveNodes.size());
+ sb.append(", startedContainers=").append(startedContainers);
+ sb.append(", startFailedContainerCount=").append(startFailedContainerCount);
+ sb.append(", surplusContainers=").append(surplusContainers);
+ sb.append(", failedContainerCount=").append(failedContainerCount);
+ sb.append(", outstandingContainerRequests=")
+ .append(outstandingContainerRequests);
+ sb.append('}');
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index 4209449..66d201f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -390,6 +390,7 @@ public class OutstandingRequestTracker {
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() {
+ log.debug("Looking for AA request to cancel");
List<AbstractRMOperation> operations = new ArrayList<>();
// first, all placed requests
@@ -404,15 +405,18 @@ public class OutstandingRequestTracker {
}
}
// second, all open requests
- for (OutstandingRequest outstandingRequest : openRequests) {
+ ListIterator<OutstandingRequest> orit = openRequests.listIterator();
+ while (orit.hasNext()) {
+ OutstandingRequest outstandingRequest = orit.next();
synchronized (outstandingRequest) {
if (outstandingRequest.isAntiAffine()) {
// time to escalate
operations.add(outstandingRequest.createCancelOperation());
- openRequests.remove(outstandingRequest);
+ orit.remove();
}
}
}
+ log.info("Cancelling {} outstanding AA requests", operations.size());
return operations;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index d7e6050..00b5226 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -1030,11 +1030,6 @@ public class RoleHistory {
List<OutstandingRequest> requests =
outstandingRequests.extractOpenRequestsForRole(roleId, toCancel);
- if (role.isAntiAffinePlacement()) {
- // TODO: AA
- // AA placement, so clear the role info
- role.cancelOutstandingAARequest();
- }
// are there any left?
int remaining = toCancel - requests.size();
// ask for some placed nodes
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
index a14a84b..b530d18 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -171,7 +171,7 @@ public final class RoleStatus implements Cloneable {
public void cancel(long count) {
requested.decToFloor(count);
}
-
+
public void decRequested() {
cancel(1);
}
@@ -334,8 +334,11 @@ public final class RoleStatus implements Cloneable {
* if there are no outstanding requests.
*/
public void cancelOutstandingAARequest() {
- setOutstandingAArequest(null);
- setPendingAntiAffineRequests(0);
+ if (outstandingAArequest != null) {
+ setOutstandingAArequest(null);
+ setPendingAntiAffineRequests(0);
+ decRequested();
+ }
}
/**
@@ -366,25 +369,33 @@ public final class RoleStatus implements Cloneable {
}
@Override
- public synchronized String toString() {
- return "RoleStatus{" +
- "name='" + name + '\'' +
- ", key=" + key +
- ", desired=" + desired +
- ", actual=" + actual +
- ", requested=" + requested +
- ", releasing=" + releasing +
- ", pendingAntiAffineRequests=" + pendingAntiAffineRequests +
- ", failed=" + failed +
- ", failed recently=" + failedRecently.get() +
- ", node failed=" + nodeFailed.get() +
- ", pre-empted=" + preempted.get() +
- ", started=" + started +
- ", startFailed=" + startFailed +
- ", completed=" + completed +
- ", failureMessage='" + failureMessage + '\'' +
- ", providerRole=" + providerRole +
- '}';
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("RoleStatus{");
+ sb.append("name='").append(name).append('\'');
+ sb.append(", key=").append(key);
+ sb.append(", desired=").append(desired);
+ sb.append(", actual=").append(actual);
+ sb.append(", requested=").append(requested);
+ sb.append(", releasing=").append(releasing);
+ sb.append(", failed=").append(failed);
+ sb.append(", startFailed=").append(startFailed);
+ sb.append(", started=").append(started);
+ sb.append(", completed=").append(completed);
+ sb.append(", totalRequested=").append(totalRequested);
+ sb.append(", preempted=").append(preempted);
+ sb.append(", nodeFailed=").append(nodeFailed);
+ sb.append(", failedRecently=").append(failedRecently);
+ sb.append(", limitsExceeded=").append(limitsExceeded);
+ sb.append(", resourceRequirements=").append(resourceRequirements);
+ sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement());
+ if (isAntiAffinePlacement()) {
+ sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests);
+ sb.append(", outstandingAArequest=").append(outstandingAArequest);
+ }
+ sb.append(", failureMessage='").append(failureMessage).append('\'');
+ sb.append(", providerRole=").append(providerRole);
+ sb.append('}');
+ return sb.toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
index c98f3bf..9a325d7 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
@@ -21,13 +21,18 @@ package org.apache.slider.server.appmaster.model.appstate
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.Container
+import org.apache.hadoop.yarn.api.records.NodeReport
+import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.providers.PlacementPolicy
import org.apache.slider.providers.ProviderRole
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
+import org.apache.slider.server.appmaster.model.mock.MockNodeReport
import org.apache.slider.server.appmaster.model.mock.MockRoles
+import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
+import org.apache.slider.server.appmaster.state.AppState
import org.apache.slider.server.appmaster.state.AppStateBindingInfo
import org.apache.slider.server.appmaster.state.ContainerAssignment
import org.apache.slider.server.appmaster.state.NodeMap
@@ -55,6 +60,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
null)
RoleStatus aaRole
+ private int NODES = 3
@Override
AppStateBindingInfo buildBindingInfo() {
@@ -73,6 +79,11 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
aaRole = lookupRole(AAROLE.name)
}
+ @Override
+ MockYarnEngine createYarnEngine() {
+ new MockYarnEngine(NODES, 8)
+ }
+
/**
* Get the single request of a list of operations; includes the check for the size
* @param ops operations list of size 1
@@ -87,7 +98,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
public void testAllocateAANoLabel() throws Throwable {
assert cloneNodemap().size() > 0
-
// want multiple instances, so there will be iterations
aaRole.desired = 2
@@ -111,7 +121,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
assert !hostInstance.canHost(aaRole.key, "")
assert !hostInstance.canHost(aaRole.key, null)
-
// assignment
assert assignments.size() == 1
@@ -205,7 +214,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
submitOperations(ops, [], ops2).size()
assert 1 == ops2.size()
assertAllContainersAA()
-
}
/**
@@ -241,17 +249,70 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest
}
/**
- * Scan through all containers and assert that the assignment is AA
- * @param index role index
+ *
+ * @throws Throwable
*/
- void assertAllContainersAA(String index) {
- def nodemap = stateAccess.nodeInformationSnapshot
- nodemap.each { name, info ->
- def nodeEntry = info.entries[index]
- assert nodeEntry == null ||
- (nodeEntry.live -nodeEntry.releasing + nodeEntry.starting) <= 1 ,
- "too many instances on node $name"
+ @Test
+ public void testAskForTooMany() throws Throwable {
+
+ describe("Ask for 1 more than the no of available nodes;" +
+ " expect the final request to be unsatisfied until the cluster changes size")
+ //more than expected
+ aaRole.desired = NODES + 1
+ List<AbstractRMOperation > operations = appState.reviewRequestAndReleaseNodes()
+ assert aaRole.AARequestOutstanding
+ assert NODES == aaRole.pendingAntiAffineRequests
+ for (int i = 0; i < NODES; i++) {
+ def iter = "Iteration $i role = $aaRole"
+ log.info(iter)
+ List<AbstractRMOperation > operationsOut = []
+ assert 1 == submitOperations(operations, [], operationsOut).size(), iter
+ operations = operationsOut
+ if (i + 1 < NODES) {
+ assert operations.size() == 2
+ } else {
+ assert operations.size() == 1
+ }
+ assertAllContainersAA()
}
+ // expect an outstanding AA request to be unsatisfied
+ assert aaRole.actual < aaRole.desired
+ assert !aaRole.requested
+ assert !aaRole.AARequestOutstanding
+ List<Container> allocatedContainers = engine.execute(operations, [])
+ assert 0 == allocatedContainers.size()
+ // in a review now, no more requests can be generated, as there is no space for AA placements,
+ // even though there is cluster capacity
+ assert 0 == appState.reviewRequestAndReleaseNodes().size()
+
+ // now do a node update (this doesn't touch the YARN engine; the node isn't really there)
+ def outcome = addNewNode()
+ assert cloneNodemap().size() == NODES + 1
+ assert outcome.clusterChanged
+ // no active calls to empty
+ assert outcome.operations.empty
+ assert 1 == appState.reviewRequestAndReleaseNodes().size()
+ }
+
+ protected AppState.NodeUpdatedOutcome addNewNode() {
+ NodeReport report = new MockNodeReport("four", NodeState.RUNNING) as NodeReport
+ appState.onNodesUpdated([report])
}
+ @Test
+ public void testClusterSizeChangesDuringRequestSequence() throws Throwable {
+ describe("Change the cluster size where the cluster size changes during a test sequence.")
+ aaRole.desired = NODES + 1
+ List<AbstractRMOperation> operations = appState.reviewRequestAndReleaseNodes()
+ assert aaRole.AARequestOutstanding
+ assert NODES == aaRole.pendingAntiAffineRequests
+ def outcome = addNewNode()
+ assert outcome.clusterChanged
+ // one call to cancel
+ assert 1 == outcome.operations.size()
+ // and on a review, one more to rebuild
+ assert 1 == appState.reviewRequestAndReleaseNodes().size()
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
index 9d0efa2..de85bba 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy
@@ -44,9 +44,6 @@ class TestRoleHistoryAA extends SliderTestBase {
NodeMap nodeMap, gpuNodeMap
RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES)
- AMRMClient.ContainerRequest requestContainer(RoleStatus roleStatus) {
- roleHistory.requestContainerForRole(roleStatus).issuedRequest
- }
@Override
void setup() {
@@ -159,7 +156,6 @@ class TestRoleHistoryAA extends SliderTestBase {
assert node1.canHost(2,"")
}
-
public List<NodeInstance> assertNoAvailableNodes(int role = 1, String label = "") {
return verifyResultSize(0, nodeMap.findAllNodesForRole(role, label))
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
index 3d472f1..4cb441d 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
@@ -279,7 +279,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
* @return a list of roles
*/
public List<RoleInstance> createAndSubmitNodes() {
- return createAndSubmitNodes([])
+ return createAndSubmitNodes([], [])
}
/**
@@ -288,9 +288,10 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
* @return a list of roles allocated
*/
public List<RoleInstance> createAndSubmitNodes(
- List<ContainerId> containerIds) {
+ List<ContainerId> containerIds,
+ List<AbstractRMOperation> operationsOut = []) {
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
- return submitOperations(ops, containerIds)
+ return submitOperations(ops, containerIds, operationsOut)
}
/**
@@ -398,4 +399,18 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
assert 1 == ops.size()
getRequest(ops, 0)
}
+
+ /**
+ * Scan through all containers and assert that the assignment is AA
+ * @param index role index
+ */
+ void assertAllContainersAA(String index) {
+ def nodemap = stateAccess.nodeInformationSnapshot
+ nodemap.each { name, info ->
+ def nodeEntry = info.entries[index]
+ assert nodeEntry == null ||
+ (nodeEntry.live - nodeEntry.releasing + nodeEntry.starting) <= 1,
+ "too many instances on node $name"
+ }
+ }
}