You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/25 21:28:47 UTC

[18/25] incubator-slider git commit: SLIDER--799 SLIDER-828 when containers are allocated, explicitly cancel the request

SLIDER--799 SLIDER-828 when containers are allocated, explicitly cancel the request


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1938323c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1938323c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1938323c

Branch: refs/heads/develop
Commit: 1938323c7fef2666a429a06a55208a528e93f64b
Parents: 7d9a9e9
Author: Steve Loughran <st...@apache.org>
Authored: Tue Mar 24 13:44:09 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Mar 24 13:49:14 2015 +0000

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       |   2 +-
 .../appmaster/management/MetricsConstants.java  |  33 ++++-
 .../operations/AsyncRMOperationHandler.java     |   6 +-
 .../operations/CancelRequestOperation.java      |  14 +-
 .../slider/server/appmaster/state/AppState.java | 147 +++++++++----------
 .../server/appmaster/state/RoleHistory.java     |   2 +-
 .../TestMockAppStateRebuildOnAMRestart.groovy   |   4 +-
 7 files changed, 124 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index ab6b55c..b8584f7 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1840,7 +1840,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     rmOperationHandler.cancelSingleRequest(request);
   }
 
-  /* =================================================================== */
+/* =================================================================== */
 /* END */
 /* =================================================================== */
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
index e55cf60..31a82a3 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
@@ -22,6 +22,35 @@ package org.apache.slider.server.appmaster.management;
  * Constants used in slider for metrics registration and lookup
  */
 public class MetricsConstants {
-  public static final String CONTAINERS_OUTSTANDING_REQUESTS =
-      "containers.outstanding-requests";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_OUTSTANDING_REQUESTS = "containers.outstanding-requests";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_STARTED = "containers.started";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_SURPLUS = "containers.surplus";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_COMPLETED = "containers.completed";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_FAILED = "containers.failed";
+
+  /**
+   * {@value}
+   */
+  public static final String CONTAINERS_START_FAILED = "containers.start-failed";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
index 7c98551..11afc0e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
@@ -50,8 +50,10 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
     // need to revoke a previously issued container request
     // so enum the sets and pick some
     int remaining = cancelSinglePriorityRequests(priority1, count);
-    remaining = cancelSinglePriorityRequests(priority2, remaining);
-    
+    if (priority2 != null) {
+      remaining = cancelSinglePriorityRequests(priority2, remaining);
+    }
+
     return remaining;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
index 9e9f277..754bf28 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
@@ -18,11 +18,12 @@
 
 package org.apache.slider.server.appmaster.operations;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.slider.server.appmaster.state.ContainerPriority;
 
 /**
- * Cancel a container request
+ * Cancel a container request at the given priority/proirities.
  */
 public class CancelRequestOperation extends AbstractRMOperation {
 
@@ -30,7 +31,15 @@ public class CancelRequestOperation extends AbstractRMOperation {
   private final Priority priority2;
   private final int count;
 
+  /**
+   * Create an instance
+   * @param priority1 first priority, the one that is released first
+   * @param priority2 optional second priority
+   * @param count number of requests to cancel
+   */
   public CancelRequestOperation(Priority priority1, Priority priority2, int count) {
+    Preconditions.checkArgument(priority1 != null, "null priority");
+    Preconditions.checkArgument(count >= 0, "negative count");
     this.priority1 = priority1;
     this.priority2 = priority2;
     this.count = count;
@@ -45,7 +54,8 @@ public class CancelRequestOperation extends AbstractRMOperation {
   public String toString() {
     return "release " + count
            + " requests for " + ContainerPriority.toString(priority1)
-           + " and " + ContainerPriority.toString(priority2);
+           + (priority2 != null ?
+        (" and " + ContainerPriority.toString(priority2)) : "");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 20e2fc0..34b0492 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -170,7 +170,7 @@ public class AppState {
    * Client properties created via the provider -static for the life
    * of the application
    */
-  private Map<String, String> clientProperties = new HashMap<String, String>();
+  private Map<String, String> clientProperties = new HashMap<>();
 
   /**
    * This is a template of the cluster status
@@ -196,7 +196,7 @@ public class AppState {
    * been allocated but are not live; it is a superset of the live list
    */
   private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers =
-    new ConcurrentHashMap<ContainerId, RoleInstance>();
+    new ConcurrentHashMap<>();
 
   /**
    * Hash map of the containers we have released, but we
@@ -204,33 +204,33 @@ public class AppState {
    * containers is treated as a successful outcome
    */
   private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
-    new ConcurrentHashMap<ContainerId, Container>();
+    new ConcurrentHashMap<>();
   
   /**
    * Counter for completed containers ( complete denotes successful or failed )
    */
-  private final AtomicInteger completedContainerCount = new AtomicInteger();
+  private final Counter completedContainerCount = new Counter();
 
   /**
    *   Count of failed containers
 
    */
-  private final AtomicInteger failedContainerCount = new AtomicInteger();
+  private final Counter failedContainerCount = new Counter();
 
   /**
    * # of started containers
    */
-  private final AtomicInteger startedContainers = new AtomicInteger();
+  private final Counter startedContainers = new Counter();
 
   /**
    * # of containers that failed to start 
    */
-  private final AtomicInteger startFailedContainers = new AtomicInteger();
+  private final Counter startFailedContainerCount = new Counter();
 
   /**
    * Track the number of surplus containers received and discarded
    */
-  private final AtomicInteger surplusContainers = new AtomicInteger();
+  private final Counter surplusContainers = new Counter();
 
 
   /**
@@ -244,21 +244,21 @@ public class AppState {
    * the node is promoted from here to the containerMap
    */
   private final Map<ContainerId, RoleInstance> startingNodes =
-    new ConcurrentHashMap<ContainerId, RoleInstance>();
+    new ConcurrentHashMap<>();
 
   /**
    * List of completed nodes. This isn't kept in the CD as it gets too
    * big for the RPC responses. Indeed, we should think about how deep to get this
    */
   private final Map<ContainerId, RoleInstance> completedNodes
-    = new ConcurrentHashMap<ContainerId, RoleInstance>();
+    = new ConcurrentHashMap<>();
 
   /**
    * Nodes that failed to start.
    * Again, kept out of the CD
    */
   private final Map<ContainerId, RoleInstance> failedNodes =
-    new ConcurrentHashMap<ContainerId, RoleInstance>();
+    new ConcurrentHashMap<>();
 
   /**
    * Nodes that came assigned to a role above that
@@ -267,11 +267,11 @@ public class AppState {
   private final Set<ContainerId> surplusNodes = new HashSet<ContainerId>();
 
   /**
-   * Map of containerID -> cluster nodes, for status reports.
+   * Map of containerID to cluster nodes, for status reports.
    * Access to this should be synchronized on the clusterDescription
    */
   private final Map<ContainerId, RoleInstance> liveNodes =
-    new ConcurrentHashMap<ContainerId, RoleInstance>();
+    new ConcurrentHashMap<>();
   private final AtomicInteger completionOfNodeNotInLiveListEvent =
     new AtomicInteger();
   private final AtomicInteger completionOfUnknownContainerEvent =
@@ -311,54 +311,53 @@ public class AppState {
       MetricsAndMonitoring metricsAndMonitoring) {
     this.recordFactory = recordFactory;
     this.metricsAndMonitoring = metricsAndMonitoring;
-    
+
     // register any metrics
-    MetricRegistry metrics = metricsAndMonitoring.getMetrics();
-    metrics.register(
+    register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
+    register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
+    register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
+    register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
+    register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
+    register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
+  }
+
+  private void register(String name, Counter counter) {
+    this.metricsAndMonitoring.getMetrics().register(
         MetricRegistry.name(AppState.class,
-            MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS),
-        outstandingContainerRequests);
+            name), counter);
   }
 
-  public int getFailedCountainerCount() {
-    return failedContainerCount.get();
+  public long getFailedCountainerCount() {
+    return failedContainerCount.getCount();
   }
 
   /**
-   * Increment the count and return the new value
-   * @return the latest failed container count
+   * Increment the count
    */
-  public int incFailedCountainerCount() {
-    return failedContainerCount.incrementAndGet();
+  public void incFailedCountainerCount() {
+    failedContainerCount.inc();
   }
 
-  public int getStartFailedCountainerCount() {
-    return startFailedContainers.get();
+  public long getStartFailedCountainerCount() {
+    return startFailedContainerCount.getCount();
   }
 
   /**
    * Increment the count and return the new value
-   * @return the latest failed container count
    */
-  public int incStartedCountainerCount() {
-    return startedContainers.incrementAndGet();
+  public void incStartedCountainerCount() {
+    startedContainers.inc();
   }
 
-  public int getStartedCountainerCount() {
-    return startedContainers.get();
+  public long getStartedCountainerCount() {
+    return startedContainers.getCount();
   }
 
   /**
    * Increment the count and return the new value
-   * @return the latest failed container count
    */
-  public int incStartFailedCountainerCount() {
-    return startFailedContainers.incrementAndGet();
-  }
-
-  
-  public AtomicInteger getStartFailedContainers() {
-    return startFailedContainers;
+  public void incStartFailedCountainerCount() {
+    startFailedContainerCount.inc();
   }
 
   public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
@@ -535,7 +534,7 @@ public class AppState {
     this.applicationInfo = applicationInfo != null ? applicationInfo
                                                    : new HashMap<String, String>();
 
-    clientProperties = new HashMap<String, String>();
+    clientProperties = new HashMap<>();
     containerReleaseSelector = releaseSelector;
 
 
@@ -1744,19 +1743,19 @@ public class AppState {
    * keylist.
    */
   protected Map<String, Integer> getLiveStatistics() {
-    Map<String, Integer> sliderstats = new HashMap<String, Integer>();
+    Map<String, Integer> sliderstats = new HashMap<>();
+    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
+        liveNodes.size());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
-        completedContainerCount.get());
+        (int)completedContainerCount.getCount());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
-        failedContainerCount.get());
-    sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE, 
-        liveNodes.size());
+        (int)failedContainerCount.getCount());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
-        startedContainers.get());
+        (int)startedContainers.getCount());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
-        startFailedContainers.get());
+        (int) startFailedContainerCount.getCount());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
-        surplusContainers.get());
+        (int)surplusContainers.getCount());
     sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
         completionOfUnknownContainerEvent.get());
     return sliderstats;
@@ -1893,7 +1892,6 @@ public class AppState {
       throws SliderInternalStateException, TriggerClusterTeardownException {
     List<AbstractRMOperation> operations = new ArrayList<>();
     int delta;
-    String details;
     int expected;
     String name = role.getName();
     synchronized (role) {
@@ -1903,7 +1901,7 @@ public class AppState {
 
     log.info("Reviewing {} : expected {}", role, expected);
     checkFailureThreshold(role);
-    
+
     if (expected < 0 ) {
       // negative value: fail
       throw new TriggerClusterTeardownException(
@@ -1912,7 +1910,7 @@ public class AppState {
           "Negative component count of %d desired for component %s",
           expected, role);
     }
-    
+
     if (delta > 0) {
       log.info("{}: Asking for {} more nodes(s) for a total of {} ", name,
                delta, expected);
@@ -2004,7 +2002,7 @@ public class AppState {
         for (RoleInstance possible : finalCandidates) {
           log.debug("Targeting for release: {}", possible);
           containerReleaseSubmitted(possible.container);
-          operations.add(new ContainerReleaseOperation(possible.getId()));       
+          operations.add(new ContainerReleaseOperation(possible.getId()));
         }
       }
 
@@ -2033,7 +2031,6 @@ public class AppState {
     return operations;
   }
 
-
   /**
    * Find a container running on a specific host -looking
    * into the node ID to determine this.
@@ -2055,7 +2052,7 @@ public class AppState {
     }
     return null;
   }
-  
+
   /**
    * Release all containers.
    * @return a list of operations to execute
@@ -2105,40 +2102,42 @@ public class AppState {
       String containerHostInfo = container.getNodeId().getHost()
                                  + ":" +
                                  container.getNodeId().getPort();
-      int allocated;
-      int desired;
       //get the role
-      ContainerId cid = container.getId();
-      RoleStatus role = lookupRoleStatus(container);
+      final ContainerId cid = container.getId();
+      final RoleStatus role = lookupRoleStatus(container);
       
 
       //dec requested count
       decrementRequestCount(role);
+
+      // cancel an allocation request which granted this, so as to avoid repeated
+      // requests
+      releaseOperations.add(new CancelRequestOperation(container.getPriority(), null, 1));
+
       //inc allocated count -this may need to be dropped in a moment,
       // but us needed to update the logic below
-      allocated = role.incActual();
+      final int allocated = role.incActual();
+      final int desired = role.getDesired();
 
-      //look for (race condition) where we get more back than we asked
-      desired = role.getDesired();
-
-      ContainerAllocationOutcome outcome = roleHistory.onContainerAllocated(container,
-        desired,
-        allocated);
+      final String roleName = role.getName();
+      final ContainerAllocationOutcome outcome =
+          roleHistory.onContainerAllocated(container, desired, allocated);
 
+      //look for condition where we get more back than we asked
       if (allocated > desired) {
-        log.info("Discarding surplus container {} on {}", cid,
-                 containerHostInfo);
+        log.info("Discarding surplus {} container {} on {}", roleName,  cid,
+            containerHostInfo);
         releaseOperations.add(new ContainerReleaseOperation(cid));
         //register as a surplus node
         surplusNodes.add(cid);
-        surplusContainers.incrementAndGet();
+        surplusContainers.inc();
         //and, as we aren't binding it to role, dec that role's actual count
         role.decActual();
       } else {
 
-        // this is valid, so decrement the number of outstanding requests
+        // Allocation being accepted -so decrement the number of outstanding requests
         decOutstandingContainerRequests();
-        String roleName = role.getName();
+
         log.info("Assigning role {} to container" +
                  " {}," +
                  " on {}:{},",
@@ -2174,8 +2173,8 @@ public class AppState {
    * @return true if a rebuild took place (even if size 0)
    * @throws RuntimeException on problems
    */
-  private boolean rebuildModelFromRestart(List<Container> liveContainers) throws
-                                                                          BadClusterStateException {
+  private boolean rebuildModelFromRestart(List<Container> liveContainers)
+      throws BadClusterStateException {
     if (liveContainers == null) {
       return false;
     }
@@ -2193,8 +2192,8 @@ public class AppState {
    * @param container container that was running before the AM restarted
    * @throws RuntimeException on problems
    */
-  private void addRestartedContainer(Container container) throws
-                                                          BadClusterStateException {
+  private void addRestartedContainer(Container container)
+      throws BadClusterStateException {
     String containerHostInfo = container.getNodeId().getHost()
                                + ":" +
                                container.getNodeId().getPort();

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 99108fe..64f9184 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -625,7 +625,7 @@ public class RoleHistory {
     List<Container> unrequested =
       new ArrayList<>(allocatedContainers.size());
     outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
-    
+
     //give the unrequested ones lower priority
     requested.addAll(unrequested);
     return requested;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
index e0fdf1b..c310583 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
@@ -85,9 +85,9 @@ class TestMockAppStateRebuildOnAMRestart extends BaseMockAppStateTest
         null,
         new MostRecentContainerReleaseSelector())
 
-    assert appState.getStartedCountainerCount() == clusterSize
+    assert appState.startedCountainerCount == clusterSize
 
-    appState.getRoleHistory().dump();
+    appState.roleHistory.dump();
 
     //check that the app state direct structures match
     List<RoleInstance> r0live = appState.enumLiveNodesInRole(ROLE0)