You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/24 21:01:47 UTC
[15/20] incubator-slider git commit: SLIDER-799 code & Test to ensure
that there's no race condition during the escalation scan + execute cycle
with entries being created & issued
SLIDER-799 code & Test to ensure that there's no race condition during the escalation scan + execute cycle with entries being created & issued
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/3dd2f724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/3dd2f724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/3dd2f724
Branch: refs/heads/feature/SLIDER-799-AM-managed-relax
Commit: 3dd2f7244026cedf62b651ee8f97f5d0dd885859
Parents: 92e7af6
Author: Steve Loughran <st...@apache.org>
Authored: Wed Mar 18 14:34:18 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Mar 18 14:34:18 2015 +0000
----------------------------------------------------------------------
.../appmaster/state/OutstandingRequest.java | 14 +++--
.../state/OutstandingRequestTracker.java | 18 ++++---
...tRoleHistoryOutstandingRequestTracker.groovy | 56 +++++++++++++-------
3 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index 799d08e..4fd2933 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
/**
@@ -134,7 +132,7 @@ public final class OutstandingRequest {
* @param labelExpression label to satisfy
* @return the request to raise
*/
- public AMRMClient.ContainerRequest buildContainerRequest(
+ public synchronized AMRMClient.ContainerRequest buildContainerRequest(
Resource resource, RoleStatus role, long time, String labelExpression) {
String[] hosts;
boolean relaxLocality;
@@ -187,9 +185,9 @@ public final class OutstandingRequest {
* as the original one, and the same host, but: relaxed placement, and a changed priority
* so as to place it into the relaxed list.
*/
- public AMRMClient.ContainerRequest escalate() {
+ public synchronized AMRMClient.ContainerRequest escalate() {
+ Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued "+ this);
escalated = true;
- Preconditions.checkNotNull(issuedRequest, "issued request");
Priority pri = ContainerPriority.createPriority(roleId, true);
String[] nodes;
List<String> issuedRequestNodes = issuedRequest.getNodes();
@@ -224,8 +222,8 @@ public final class OutstandingRequest {
* @param time time to check against
* @return true if escalation should begin
*/
- public boolean shouldEscalate(long time) {
- return !escalated && escalationTimeoutMillis < time;
+ public synchronized boolean shouldEscalate(long time) {
+ return issuedRequest != null && !escalated && escalationTimeoutMillis < time;
}
/**
@@ -267,7 +265,7 @@ public final class OutstandingRequest {
}
@Override
- public String toString() {
+ public synchronized String toString() {
final StringBuilder sb = new StringBuilder("OutstandingRequest{");
sb.append("roleId=").append(roleId);
sb.append(", node=").append(node);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index 93db3fa..48f6e57 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -245,14 +245,18 @@ public class OutstandingRequestTracker {
List<AbstractRMOperation> operations = new ArrayList<>();
for (OutstandingRequest outstandingRequest : placedRequests.values()) {
- if (outstandingRequest.shouldEscalate(now)) {
+ synchronized (outstandingRequest) {
+ // sync escalation check with operation so that nothing can happen to state
+ // of the request during the escalation
+ if (outstandingRequest.shouldEscalate(now)) {
- // time to escalate
- CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest);
- operations.add(cancel);
- AMRMClient.ContainerRequest escalated =
- outstandingRequest.escalate();
- operations.add(new ContainerRequestOperation(escalated));
+ // time to escalate
+ CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest);
+ operations.add(cancel);
+ AMRMClient.ContainerRequest escalated =
+ outstandingRequest.escalate();
+ operations.add(new ContainerRequestOperation(escalated));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 8c79bbf..2fe6763 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -29,6 +29,7 @@ import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.state.NodeInstance
import org.apache.slider.server.appmaster.state.OutstandingRequest
import org.apache.slider.server.appmaster.state.OutstandingRequestTracker
+import org.apache.slider.server.appmaster.state.RoleStatus
import org.junit.Test
class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
@@ -88,30 +89,32 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
*/
@Test
- public void testEscalationOfStrictPlacement() throws Throwable {
- final def roleStatus = role1Status
-
-
- ProviderRole role = roleStatus.providerRole
- assert role.placementPolicy == PlacementPolicy.STRICT;
- Resource resource = new MockResource()
-
- appState.buildResourceRequirements(roleStatus, resource)
-
- // first requst
- OutstandingRequest r1 = tracker.newRequest(host1, roleStatus.key)
- final def initialRequest = r1.buildContainerRequest(resource, roleStatus, 0, null)
- assert r1.issuedRequest != null;
- assert r1.located
- assert !r1.escalated
-
+ public void testEscalation() throws Throwable {
+ ProviderRole providerRole1 = role1Status.providerRole
+ assert providerRole1.placementPolicy == PlacementPolicy.STRICT;
+ // first request
+ final def (res1, outstanding1) = newRequest(role1Status)
+ final def initialRequest = outstanding1.buildContainerRequest(res1, role1Status, 0, null)
+ assert outstanding1.issuedRequest != null;
+ assert outstanding1.located
+ assert !outstanding1.escalated
assert !initialRequest.relaxLocality
assert tracker.listOutstandingRequests().size() == 1
- // simulate a few minutes; escalation MUST now be triggered
- List<AbstractRMOperation> escalations = tracker.escalateOutstandingRequests(180 * 1000)
+ // second. This one doesn't get launched. This is to verify that the escalation
+ // process skips entries which are in the list but have not been issued.
+ // ...which can be a race condition between request issuance & escalation.
+ // (not one observed outside test authoring, but retained for completeness)
+ assert role2Status.placementPolicy == PlacementPolicy.ANTI_AFFINITY_REQUIRED
+ def (res2, outstanding2) = newRequest(role2Status)
+
+ // simulate some time escalation of role 1 MUST now be triggered
+ List<AbstractRMOperation> escalations =
+ tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500 )
+
+ assert outstanding1.escalated
+ assert !outstanding2.escalated
- assert r1.escalated
// two entries
assert escalations.size() == 2;
final def e1 = escalations[0]
@@ -124,5 +127,18 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
def req2 = escRequest.request
assert req2.relaxLocality
+ def (res3, outstanding3) = newRequest(role2Status)
+ outstanding3.buildContainerRequest(res3, role2Status, 0, null)
+
+ List<AbstractRMOperation> escalations2 =
+ tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500)
+ assert escalations2.size() == 0
+ }
+
+ public def newRequest(RoleStatus r) {
+ final Resource res2 = new MockResource()
+ appState.buildResourceRequirements(r, res2)
+ final OutstandingRequest outstanding2 = tracker.newRequest(host1, r.key)
+ return [res2, outstanding2]
}
}