You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/25 21:28:44 UTC

[15/25] incubator-slider git commit: SLIDER-799 code & Test to ensure that there's no race condition during the escalation scan + execute cycle with entries being created & issued

SLIDER-799 code & Test to ensure that there's no race condition during the escalation scan + execute cycle with entries being created & issued


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/3dd2f724
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/3dd2f724
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/3dd2f724

Branch: refs/heads/develop
Commit: 3dd2f7244026cedf62b651ee8f97f5d0dd885859
Parents: 92e7af6
Author: Steve Loughran <st...@apache.org>
Authored: Wed Mar 18 14:34:18 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Mar 18 14:34:18 2015 +0000

----------------------------------------------------------------------
 .../appmaster/state/OutstandingRequest.java     | 14 +++--
 .../state/OutstandingRequestTracker.java        | 18 ++++---
 ...tRoleHistoryOutstandingRequestTracker.groovy | 56 +++++++++++++-------
 3 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index 799d08e..4fd2933 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -134,7 +132,7 @@ public final class OutstandingRequest {
    * @param labelExpression label to satisfy
    * @return the request to raise
    */
-  public AMRMClient.ContainerRequest buildContainerRequest(
+  public synchronized AMRMClient.ContainerRequest buildContainerRequest(
       Resource resource, RoleStatus role, long time, String labelExpression) {
     String[] hosts;
     boolean relaxLocality;
@@ -187,9 +185,9 @@ public final class OutstandingRequest {
    * as the original one, and the same host, but: relaxed placement, and a changed priority
    * so as to place it into the relaxed list.
    */
-  public AMRMClient.ContainerRequest escalate() {
+  public synchronized AMRMClient.ContainerRequest escalate() {
+    Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued "+ this);
     escalated = true;
-    Preconditions.checkNotNull(issuedRequest, "issued request");
     Priority pri = ContainerPriority.createPriority(roleId, true);
     String[] nodes;
     List<String> issuedRequestNodes = issuedRequest.getNodes();
@@ -224,8 +222,8 @@ public final class OutstandingRequest {
    * @param time time to check against
    * @return true if escalation should begin
    */
-  public boolean shouldEscalate(long time) {
-    return !escalated && escalationTimeoutMillis < time;
+  public synchronized boolean shouldEscalate(long time) {
+    return issuedRequest != null && !escalated && escalationTimeoutMillis < time;
   }
   
   /**
@@ -267,7 +265,7 @@ public final class OutstandingRequest {
   }
 
   @Override
-  public String toString() {
+  public synchronized String toString() {
     final StringBuilder sb = new StringBuilder("OutstandingRequest{");
     sb.append("roleId=").append(roleId);
     sb.append(", node=").append(node);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index 93db3fa..48f6e57 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -245,14 +245,18 @@ public class OutstandingRequestTracker {
 
     List<AbstractRMOperation> operations = new ArrayList<>();
     for (OutstandingRequest outstandingRequest : placedRequests.values()) {
-      if (outstandingRequest.shouldEscalate(now)) {
+      synchronized (outstandingRequest) {
+        // sync escalation check with operation so that nothing can happen to state
+        // of the request during the escalation
+        if (outstandingRequest.shouldEscalate(now)) {
 
-        // time to escalate
-        CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest);
-        operations.add(cancel);
-        AMRMClient.ContainerRequest escalated =
-            outstandingRequest.escalate();
-        operations.add(new ContainerRequestOperation(escalated));
+          // time to escalate
+          CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest);
+          operations.add(cancel);
+          AMRMClient.ContainerRequest escalated =
+              outstandingRequest.escalate();
+          operations.add(new ContainerRequestOperation(escalated));
+        }
       }
       
     }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 8c79bbf..2fe6763 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -29,6 +29,7 @@ import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
 import org.apache.slider.server.appmaster.state.NodeInstance
 import org.apache.slider.server.appmaster.state.OutstandingRequest
 import org.apache.slider.server.appmaster.state.OutstandingRequestTracker
+import org.apache.slider.server.appmaster.state.RoleStatus
 import org.junit.Test
 
 class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
@@ -88,30 +89,32 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
 */
 
   @Test
-  public void testEscalationOfStrictPlacement() throws Throwable {
-    final def roleStatus = role1Status
-
-
-    ProviderRole role = roleStatus.providerRole
-    assert role.placementPolicy == PlacementPolicy.STRICT;
-    Resource resource = new MockResource()
-
-    appState.buildResourceRequirements(roleStatus, resource)
-
-    // first requst
-    OutstandingRequest r1 = tracker.newRequest(host1, roleStatus.key)
-    final def initialRequest = r1.buildContainerRequest(resource, roleStatus, 0, null)
-    assert r1.issuedRequest != null;
-    assert r1.located
-    assert !r1.escalated
-
+  public void testEscalation() throws Throwable {
+    ProviderRole providerRole1 = role1Status.providerRole
+    assert providerRole1.placementPolicy == PlacementPolicy.STRICT;
+    // first request
+    final def (res1, outstanding1) = newRequest(role1Status)
+    final def initialRequest = outstanding1.buildContainerRequest(res1, role1Status, 0, null)
+    assert outstanding1.issuedRequest != null;
+    assert outstanding1.located
+    assert !outstanding1.escalated
     assert !initialRequest.relaxLocality
     assert tracker.listOutstandingRequests().size() == 1
 
-    // simulate a few minutes; escalation MUST now be triggered
-    List<AbstractRMOperation> escalations = tracker.escalateOutstandingRequests(180 * 1000)
+    // second. This one doesn't get launched. This is to verify that the escalation
+    // process skips entries which are in the list but have not been issued.
+    // ...which can be a race condition between request issuance & escalation.
+    // (not one observed outside test authoring, but retained for completeness)
+    assert role2Status.placementPolicy == PlacementPolicy.ANTI_AFFINITY_REQUIRED
+    def (res2, outstanding2) = newRequest(role2Status)
+
+    // simulate some time escalation of role 1 MUST now be triggered
+    List<AbstractRMOperation> escalations =
+        tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500 )
+
+    assert outstanding1.escalated
+    assert !outstanding2.escalated
 
-    assert r1.escalated
     // two entries
     assert escalations.size() == 2;
     final def e1 = escalations[0]
@@ -124,5 +127,18 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest {
     def req2 = escRequest.request
     assert req2.relaxLocality
 
+    def (res3, outstanding3) = newRequest(role2Status)
+    outstanding3.buildContainerRequest(res3, role2Status, 0, null)
+
+    List<AbstractRMOperation> escalations2 =
+        tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500)
+    assert escalations2.size() == 0
+  }
+
+  public def newRequest(RoleStatus r) {
+    final Resource res2 = new MockResource()
+    appState.buildResourceRequirements(r, res2)
+    final OutstandingRequest outstanding2 = tracker.newRequest(host1, r.key)
+    return [res2, outstanding2]
   }
 }