You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/09/14 20:08:47 UTC

[2/2] incubator-slider git commit: SLIDER-82 support anti-affinity: this is the original submission with some minor tweaks and reformatting

SLIDER-82 support anti-affinity: this is the original submission with some minor tweaks and reformatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/12893b96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/12893b96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/12893b96

Branch: refs/heads/feature/SLIDER-82_ANTI_AFFINITY_REQUIRED
Commit: 12893b96bc2e95e1b1ef1aba2ecf39b1330b6a32
Parents: ad0be55
Author: Steve Loughran <st...@apache.org>
Authored: Mon Sep 14 19:08:06 2015 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Sep 14 19:08:06 2015 +0100

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../providers/agent/AgentProviderService.java   |  5 ++
 .../slideram/SliderAMProviderService.java       |  5 ++
 .../server/appmaster/SliderAppMaster.java       |  7 +-
 .../operations/AsyncRMOperationHandler.java     | 11 +++
 .../operations/ContainerRequestOperation.java   | 72 +++++++++++++++++++-
 .../ProviderNotifyingOperationHandler.java      |  6 ++
 .../operations/RMOperationHandlerActions.java   |  8 +++
 .../slider/server/appmaster/state/AppState.java | 27 ++++++--
 .../server/appmaster/state/RoleHistory.java     | 40 ++++++++++-
 .../model/mock/MockProviderService.groovy       |  5 ++
 .../model/mock/MockRMOperationHandler.groovy    |  5 ++
 12 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f8ff5e..98390e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,7 @@
     <!--
     core artifacts
     -->
-    <hadoop.version>2.6.0</hadoop.version>
+    <hadoop.version>2.7.1</hadoop.version>
 
     <hbase.version>0.99.0</hbase.version>
     <accumulo.version>1.7.0</accumulo.version>

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index e3dc791..71421bd 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -2823,4 +2823,9 @@ public class AgentProviderService extends AbstractProviderService implements
                   "");
     }
   }
+
+  @Override
+  public void updateBlacklist(List<String> blacklistAdditions,
+                        List<String> blacklistRemovals) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index cee7a97..e1dd920 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -187,4 +187,9 @@ public class SliderAMProviderService extends AbstractProviderService implements
       throw new IOException(e);
     }
   }
+
+  @Override
+  public void updateBlacklist(List<String> blacklistAdditions,
+                       List<String> blacklistRemovals) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 019ec71..aeb9753 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -2283,7 +2283,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       return false;
     }
   }
-  
+
+  @Override
+  public void updateBlacklist(List<String> blacklistAdditions,
+      List<String> blacklistRemovals) {
+  }
+
   /**
    * This is the main entry point for the service launcher.
    * @param args command line arguments.

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
index 11afc0e..0329696 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
@@ -63,6 +63,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
    * @param count count to cancel
    * @return number of requests cancelled
    */
+  @SuppressWarnings("unchecked")
   protected int cancelSinglePriorityRequests(Priority priority,
       int count) {
     List<Collection<AMRMClient.ContainerRequest>> requestSets =
@@ -88,6 +89,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
     // a single release
     client.removeContainerRequest(request);
@@ -103,6 +105,15 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
   @Override
   @SuppressWarnings("unchecked")
   public void addContainerRequest(AMRMClient.ContainerRequest req) {
+    log.debug("addContainerRequest():  Request = {}, getCapability() = {}, getNodes() = {}",
+        req, req.getCapability(), req.getNodes());
     client.addContainerRequest(req);
   }
+
+  @Override
+  public void updateBlacklist(List<String> blacklistAdditions,
+      List<String> blacklistRemovals) {
+    client.updateBlacklist(blacklistAdditions, blacklistRemovals);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
index b8120ca..d847360 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
@@ -18,23 +18,93 @@
 
 package org.apache.slider.server.appmaster.operations;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.slider.server.appmaster.state.ContainerPriority;
+import org.apache.slider.server.appmaster.state.NodeInstance;
+import org.apache.slider.server.appmaster.state.RoleHistory;
+import org.apache.slider.server.appmaster.state.RoleStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A container request operation
+ */
 public class ContainerRequestOperation extends AbstractRMOperation {
+  protected static final Logger log =
+  LoggerFactory.getLogger(ContainerRequestOperation.class);
 
   private final AMRMClient.ContainerRequest request;
+  private List<String> blacklistAdditions;
+  private List<String> blacklistRemovals;
 
-  public ContainerRequestOperation(AMRMClient.ContainerRequest request) {
+  /**
+   * Build a request
+   * @param request request to use
+   * @param role role (or null)
+   * @param activeNodesForRole list of active nodes for this role. Must be set if role is not null.
+   * @param failedNodesForRole list of failed nodes for this role. Must be set if role is not null.
+   */
+  public ContainerRequestOperation(AMRMClient.ContainerRequest request,
+      RoleStatus role, List<NodeInstance> activeNodesForRole,
+      List<String> failedNodesForRole) {
+    blacklistAdditions = new ArrayList<>();
+    blacklistRemovals = new ArrayList<>();
+    if (role != null) {
+      Preconditions.checkArgument(activeNodesForRole != null, "Null activeNodesForRole");
+      Preconditions.checkArgument(failedNodesForRole != null, "Null failedNodesForRole");
+      log.info("ContainerRequestOperation(): Role: {} , Request {}", role.getName(), request);
+      if (role.isAntiAffinePlacement()) {
+        for (NodeInstance nit1 : activeNodesForRole) {
+          log.info("ContainerRequestOperation(): add to blacklist nodes - Role: {},  Node: {}",
+              role.getName(), nit1.hostname);
+          blacklistAdditions.add(nit1.hostname);
+        }
+      }
+      blacklistAdditions.addAll(failedNodesForRole);
+    }
     this.request = request;
   }
 
+  /**
+   * Create a request with no blacklisting/affinity information
+   *
+   * @param request request to issue
+   */
+  public ContainerRequestOperation(AMRMClient.ContainerRequest request) {
+    this(request, null, null, null);
+  }
+
+  /**
+   * Get the underlying request
+   * @return
+   */
   public AMRMClient.ContainerRequest getRequest() {
     return request;
   }
 
+  /**
+   * Get the current blacklist additions
+   * @return the list of additions
+   */
+  public List<String> getBlacklistAdditions() {
+    return blacklistAdditions;
+  }
+
+  /**
+   * get the current blacklist removals
+   * @return the list of removals
+   */
+  public List<String> getBlacklistRemovals() {
+    return blacklistRemovals;
+  }
+
   @Override
   public void execute(RMOperationHandlerActions handler) {
+    handler.updateBlacklist(blacklistAdditions, blacklistRemovals);
     handler.addContainerRequest(request);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
index 184a36a..232a797 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.slider.providers.ProviderService;
+import java.util.List;
 
 public class ProviderNotifyingOperationHandler extends RMOperationHandler {
   
@@ -52,4 +53,9 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler {
   public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
     providerService.cancelSingleRequest(request);
   }
+
+  @Override
+  public void updateBlacklist(List<String> blacklistAdditions,
+                List<String> blacklistRemovals) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
index 594ee47..7915ab4 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
@@ -52,4 +52,12 @@ public interface RMOperationHandlerActions {
    * @param operations ops
    */
   void execute(List<AbstractRMOperation> operations);
+
+  /**
+   * Update Blacklist operation
+   * @param blacklistAdditions possibly null list of additions to the blacklist
+   * @param blacklistRemovals possibly null list of nodes to remove from the blacklist
+   */
+  void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 18eb578..b632626 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -92,8 +92,8 @@ import static org.apache.slider.api.ResourceKeys.YARN_CORES;
 import static org.apache.slider.api.ResourceKeys.YARN_LABEL_EXPRESSION;
 import static org.apache.slider.api.ResourceKeys.YARN_MEMORY;
 import static org.apache.slider.api.RoleKeys.ROLE_FAILED_INSTANCES;
-import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES;
 import static org.apache.slider.api.RoleKeys.ROLE_FAILED_RECENTLY_INSTANCES;
+import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES;
 import static org.apache.slider.api.RoleKeys.ROLE_NODE_FAILED_INSTANCES;
 import static org.apache.slider.api.RoleKeys.ROLE_PREEMPTED_INSTANCES;
 import static org.apache.slider.api.RoleKeys.ROLE_RELEASING_INSTANCES;
@@ -1931,7 +1931,7 @@ public class AppState {
     if (delta > 0) {
       log.info("{}: Asking for {} more nodes(s) for a total of {} ", name,
                delta, expected);
-      //more workers needed than we have -ask for more
+      // more workers needed than we have -ask for more
       for (int i = 0; i < delta; i++) {
         Resource capability = recordFactory.newResource();
         AMRMClient.ContainerRequest containerAsk =
@@ -1942,15 +1942,20 @@ public class AppState {
         if (askMemory > this.containerMaxMemory) {
           log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
         }
-        operations.add(new ContainerRequestOperation(containerAsk));
+        // build a container request including placement and blacklisting data
+        operations.add(new ContainerRequestOperation(containerAsk,
+            role,
+            roleHistory.listActiveNodes(role.getKey()),
+            roleHistory.cloneFailedNodes()
+        ));
       }
     } else if (delta < 0) {
       log.info("{}: Asking for {} fewer node(s) for a total of {}", name,
                -delta,
                expected);
-      //reduce the number expected (i.e. subtract the delta)
+      // reduce the number expected (i.e. subtract the delta)
 
-      //then pick some containers to kill
+      // then pick some containers to kill
       int excess = -delta;
 
       // how many requests are outstanding
@@ -2125,6 +2130,8 @@ public class AppState {
     assignments.clear();
     releaseOperations.clear();
     List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
+    roleHistory.resetRequestedNodes();
+    log.info("onContainersAllocated(): Total containers allocated = "+ordered.size());
     for (Container container : ordered) {
       String containerHostInfo = container.getNodeId().getHost()
                                  + ":" +
@@ -2132,8 +2139,13 @@ public class AppState {
       //get the role
       final ContainerId cid = container.getId();
       final RoleStatus role = lookupRoleStatus(container);
-      
-
+      log.info("onContainersAllocated(): "+containerHostInfo+", cid= "+cid);
+      log.debug("onContainersAllocated(): "+role);
+      if((role.isAntiAffinePlacement())&& (roleHistory.nodeAlreadyRequested(role.getKey(),container.getNodeId().getHost()))){
+          releaseOperations.add(new ContainerReleaseOperation(cid));
+          log.info("onContainersAllocated() "+cid +" is on already requested node "+container.getNodeId().getHost()+", releasing...");
+          continue;
+      }
       //dec requested count
       decrementRequestCount(role);
 
@@ -2186,6 +2198,7 @@ public class AppState {
         assignments.add(new ContainerAssignment(container, role, outcome));
         //add to the history
         roleHistory.onContainerAssigned(container);
+        roleHistory.addRequestedNodeForRoleId(role.getKey(), container.getNodeId().getHost());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 926d440..fe69cb0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -93,6 +93,12 @@ public class RoleHistory {
   private Map<Integer, LinkedList<NodeInstance>> availableNodes;
 
   /**
+   *    * Track nodes where requests have been submitted
+   *
+   */
+  private Map<Integer, LinkedList<String>> requestedNodes;
+
+  /**
    * Track the failed nodes. Currently used to make wiser decision of container
    * ask with/without locality. Has other potential uses as well.
    */
@@ -186,6 +192,37 @@ public class RoleHistory {
    */
   private synchronized void resetAvailableNodeLists() {
     availableNodes = new HashMap<>(roleSize);
+    resetRequestedNodes();
+  }
+
+  /**
+   * Clear the list of nodes where request has been made
+   */
+  public void resetRequestedNodes() {
+    requestedNodes = new HashMap<>(roleSize);
+  }
+
+  /**
+   * Track nodes where requests have been submitted
+   */
+  public void addRequestedNodeForRoleId(int id, String hostname) {
+    LinkedList<String> instances = requestedNodes.get(id);
+    if (instances == null) {
+      instances = new LinkedList<>();
+    }
+    instances.add(hostname);
+    requestedNodes.put(id, instances);
+  }
+
+  /**
+   * Check if node is in the requested list
+   * @param id role ID
+   * @param hostname host
+   * @return true if there is an outstanding request for a role on that host
+   */
+  public boolean nodeAlreadyRequested(int id, String hostname) {
+    LinkedList<String> instances = requestedNodes.get(id);
+    return instances != null && instances.contains(hostname);
   }
 
   /**
@@ -717,8 +754,7 @@ public class RoleHistory {
     if (desiredCount <= actualCount) {
       // all outstanding requests have been satisfied
       // clear all the lists, so returning nodes to the available set
-      List<NodeInstance>
-          hosts = outstandingRequests.resetOutstandingRequests(role);
+      List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role);
       if (!hosts.isEmpty()) {
         //add the list
         log.info("Adding {} hosts for role {}", hosts.size(), role);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index 44415f4..35983c9 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -297,4 +297,9 @@ class MockProviderService implements ProviderService {
   void rebuildContainerDetails(List<Container> liveContainers, String applicationId,
       Map<Integer, ProviderRole> roleProviderMap) {
   }
+
+  @Override
+  void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
index a68ce02..bb42cc1 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
@@ -78,4 +78,9 @@ class MockRMOperationHandler extends RMOperationHandler {
     releases = 0;
     requests = 0;
   }
+
+  @Override
+  void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+  }
+
 }