You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/09/14 20:08:47 UTC
[2/2] incubator-slider git commit: SLIDER-82 support anti-affinity:
this is the original submission with some minor tweaks and reformatting
SLIDER-82 support anti-affinity: this is the original submission with some minor tweaks and reformatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/12893b96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/12893b96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/12893b96
Branch: refs/heads/feature/SLIDER-82_ANTI_AFFINITY_REQUIRED
Commit: 12893b96bc2e95e1b1ef1aba2ecf39b1330b6a32
Parents: ad0be55
Author: Steve Loughran <st...@apache.org>
Authored: Mon Sep 14 19:08:06 2015 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Sep 14 19:08:06 2015 +0100
----------------------------------------------------------------------
pom.xml | 2 +-
.../providers/agent/AgentProviderService.java | 5 ++
.../slideram/SliderAMProviderService.java | 5 ++
.../server/appmaster/SliderAppMaster.java | 7 +-
.../operations/AsyncRMOperationHandler.java | 11 +++
.../operations/ContainerRequestOperation.java | 72 +++++++++++++++++++-
.../ProviderNotifyingOperationHandler.java | 6 ++
.../operations/RMOperationHandlerActions.java | 8 +++
.../slider/server/appmaster/state/AppState.java | 27 ++++++--
.../server/appmaster/state/RoleHistory.java | 40 ++++++++++-
.../model/mock/MockProviderService.groovy | 5 ++
.../model/mock/MockRMOperationHandler.groovy | 5 ++
12 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f8ff5e..98390e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,7 @@
<!--
core artifacts
-->
- <hadoop.version>2.6.0</hadoop.version>
+ <hadoop.version>2.7.1</hadoop.version>
<hbase.version>0.99.0</hbase.version>
<accumulo.version>1.7.0</accumulo.version>
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index e3dc791..71421bd 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -2823,4 +2823,9 @@ public class AgentProviderService extends AbstractProviderService implements
"");
}
}
+
+ @Override
+ public void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index cee7a97..e1dd920 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -187,4 +187,9 @@ public class SliderAMProviderService extends AbstractProviderService implements
throw new IOException(e);
}
}
+
+ @Override
+ public void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 019ec71..aeb9753 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -2283,7 +2283,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
return false;
}
}
-
+
+ @Override
+ public void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ }
+
/**
* This is the main entry point for the service launcher.
* @param args command line arguments.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
index 11afc0e..0329696 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
@@ -63,6 +63,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
* @param count count to cancel
* @return number of requests cancelled
*/
+ @SuppressWarnings("unchecked")
protected int cancelSinglePriorityRequests(Priority priority,
int count) {
List<Collection<AMRMClient.ContainerRequest>> requestSets =
@@ -88,6 +89,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
}
@Override
+ @SuppressWarnings("unchecked")
public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
// a single release
client.removeContainerRequest(request);
@@ -103,6 +105,15 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
@Override
@SuppressWarnings("unchecked")
public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ log.debug("addContainerRequest(): Request = {}, getCapability() = {}, getNodes() = {}",
+ req, req.getCapability(), req.getNodes());
client.addContainerRequest(req);
}
+
+ @Override
+ public void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ client.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
index b8120ca..d847360 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java
@@ -18,23 +18,93 @@
package org.apache.slider.server.appmaster.operations;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.slider.server.appmaster.state.ContainerPriority;
+import org.apache.slider.server.appmaster.state.NodeInstance;
+import org.apache.slider.server.appmaster.state.RoleHistory;
+import org.apache.slider.server.appmaster.state.RoleStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A container request operation
+ */
public class ContainerRequestOperation extends AbstractRMOperation {
+ protected static final Logger log =
+ LoggerFactory.getLogger(ContainerRequestOperation.class);
private final AMRMClient.ContainerRequest request;
+ private List<String> blacklistAdditions;
+ private List<String> blacklistRemovals;
- public ContainerRequestOperation(AMRMClient.ContainerRequest request) {
+ /**
+ * Build a request
+ * @param request request to use
+ * @param role role (or null)
+ * @param activeNodesForRole list of active nodes for this role. Must be set if role is not null.
+ * @param failedNodesForRole list of failed nodes for this role. Must be set if role is not null.
+ */
+ public ContainerRequestOperation(AMRMClient.ContainerRequest request,
+ RoleStatus role, List<NodeInstance> activeNodesForRole,
+ List<String> failedNodesForRole) {
+ blacklistAdditions = new ArrayList<>();
+ blacklistRemovals = new ArrayList<>();
+ if (role != null) {
+ Preconditions.checkArgument(activeNodesForRole != null, "Null activeNodesForRole");
+ Preconditions.checkArgument(failedNodesForRole != null, "Null failedNodesForRole");
+ log.info("ContainerRequestOperation(): Role: {} , Request {}", role.getName(), request);
+ if (role.isAntiAffinePlacement()) {
+ for (NodeInstance nit1 : activeNodesForRole) {
+ log.info("ContainerRequestOperation(): add to blacklist nodes - Role: {}, Node: {}",
+ role.getName(), nit1.hostname);
+ blacklistAdditions.add(nit1.hostname);
+ }
+ }
+ blacklistAdditions.addAll(failedNodesForRole);
+ }
this.request = request;
}
+ /**
+ * Create a request with no blacklisting/affinity information
+ *
+ * @param request request to issue
+ */
+ public ContainerRequestOperation(AMRMClient.ContainerRequest request) {
+ this(request, null, null, null);
+ }
+
+ /**
+ * Get the underlying request
+ * @return
+ */
public AMRMClient.ContainerRequest getRequest() {
return request;
}
+ /**
+ * Get the current blacklist additions
+ * @return the list of additions
+ */
+ public List<String> getBlacklistAdditions() {
+ return blacklistAdditions;
+ }
+
+ /**
+ * get the current blacklist removals
+ * @return the list of removals
+ */
+ public List<String> getBlacklistRemovals() {
+ return blacklistRemovals;
+ }
+
@Override
public void execute(RMOperationHandlerActions handler) {
+ handler.updateBlacklist(blacklistAdditions, blacklistRemovals);
handler.addContainerRequest(request);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
index 184a36a..232a797 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.slider.providers.ProviderService;
+import java.util.List;
public class ProviderNotifyingOperationHandler extends RMOperationHandler {
@@ -52,4 +53,9 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler {
public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
providerService.cancelSingleRequest(request);
}
+
+ @Override
+ public void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
index 594ee47..7915ab4 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java
@@ -52,4 +52,12 @@ public interface RMOperationHandlerActions {
* @param operations ops
*/
void execute(List<AbstractRMOperation> operations);
+
+ /**
+ * Update Blacklist operation
+ * @param blacklistAdditions possibly null list of additions to the blacklist
+ * @param blacklistRemovals possibly null list of nodes to remove from the blacklist
+ */
+ void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 18eb578..b632626 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -92,8 +92,8 @@ import static org.apache.slider.api.ResourceKeys.YARN_CORES;
import static org.apache.slider.api.ResourceKeys.YARN_LABEL_EXPRESSION;
import static org.apache.slider.api.ResourceKeys.YARN_MEMORY;
import static org.apache.slider.api.RoleKeys.ROLE_FAILED_INSTANCES;
-import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES;
import static org.apache.slider.api.RoleKeys.ROLE_FAILED_RECENTLY_INSTANCES;
+import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES;
import static org.apache.slider.api.RoleKeys.ROLE_NODE_FAILED_INSTANCES;
import static org.apache.slider.api.RoleKeys.ROLE_PREEMPTED_INSTANCES;
import static org.apache.slider.api.RoleKeys.ROLE_RELEASING_INSTANCES;
@@ -1931,7 +1931,7 @@ public class AppState {
if (delta > 0) {
log.info("{}: Asking for {} more nodes(s) for a total of {} ", name,
delta, expected);
- //more workers needed than we have -ask for more
+ // more workers needed than we have -ask for more
for (int i = 0; i < delta; i++) {
Resource capability = recordFactory.newResource();
AMRMClient.ContainerRequest containerAsk =
@@ -1942,15 +1942,20 @@ public class AppState {
if (askMemory > this.containerMaxMemory) {
log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory);
}
- operations.add(new ContainerRequestOperation(containerAsk));
+ // build a container request including placement and blacklisting data
+ operations.add(new ContainerRequestOperation(containerAsk,
+ role,
+ roleHistory.listActiveNodes(role.getKey()),
+ roleHistory.cloneFailedNodes()
+ ));
}
} else if (delta < 0) {
log.info("{}: Asking for {} fewer node(s) for a total of {}", name,
-delta,
expected);
- //reduce the number expected (i.e. subtract the delta)
+ // reduce the number expected (i.e. subtract the delta)
- //then pick some containers to kill
+ // then pick some containers to kill
int excess = -delta;
// how many requests are outstanding
@@ -2125,6 +2130,8 @@ public class AppState {
assignments.clear();
releaseOperations.clear();
List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers);
+ roleHistory.resetRequestedNodes();
+ log.info("onContainersAllocated(): Total containers allocated = "+ordered.size());
for (Container container : ordered) {
String containerHostInfo = container.getNodeId().getHost()
+ ":" +
@@ -2132,8 +2139,13 @@ public class AppState {
//get the role
final ContainerId cid = container.getId();
final RoleStatus role = lookupRoleStatus(container);
-
-
+ log.info("onContainersAllocated(): "+containerHostInfo+", cid= "+cid);
+ log.debug("onContainersAllocated(): "+role);
+ if((role.isAntiAffinePlacement())&& (roleHistory.nodeAlreadyRequested(role.getKey(),container.getNodeId().getHost()))){
+ releaseOperations.add(new ContainerReleaseOperation(cid));
+ log.info("onContainersAllocated() "+cid +" is on already requested node "+container.getNodeId().getHost()+", releasing...");
+ continue;
+ }
//dec requested count
decrementRequestCount(role);
@@ -2186,6 +2198,7 @@ public class AppState {
assignments.add(new ContainerAssignment(container, role, outcome));
//add to the history
roleHistory.onContainerAssigned(container);
+ roleHistory.addRequestedNodeForRoleId(role.getKey(), container.getNodeId().getHost());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 926d440..fe69cb0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -93,6 +93,12 @@ public class RoleHistory {
private Map<Integer, LinkedList<NodeInstance>> availableNodes;
/**
+ * * Track nodes where requests have been submitted
+ *
+ */
+ private Map<Integer, LinkedList<String>> requestedNodes;
+
+ /**
* Track the failed nodes. Currently used to make wiser decision of container
* ask with/without locality. Has other potential uses as well.
*/
@@ -186,6 +192,37 @@ public class RoleHistory {
*/
private synchronized void resetAvailableNodeLists() {
availableNodes = new HashMap<>(roleSize);
+ resetRequestedNodes();
+ }
+
+ /**
+ * Clear the list of nodes where request has been made
+ */
+ public void resetRequestedNodes() {
+ requestedNodes = new HashMap<>(roleSize);
+ }
+
+ /**
+ * Track nodes where requests have been submitted
+ */
+ public void addRequestedNodeForRoleId(int id, String hostname) {
+ LinkedList<String> instances = requestedNodes.get(id);
+ if (instances == null) {
+ instances = new LinkedList<>();
+ }
+ instances.add(hostname);
+ requestedNodes.put(id, instances);
+ }
+
+ /**
+ * Check if node is in the requested list
+ * @param id role ID
+ * @param hostname host
+ * @return true if there is an outstanding request for a role on that host
+ */
+ public boolean nodeAlreadyRequested(int id, String hostname) {
+ LinkedList<String> instances = requestedNodes.get(id);
+ return instances != null && instances.contains(hostname);
}
/**
@@ -717,8 +754,7 @@ public class RoleHistory {
if (desiredCount <= actualCount) {
// all outstanding requests have been satisfied
// clear all the lists, so returning nodes to the available set
- List<NodeInstance>
- hosts = outstandingRequests.resetOutstandingRequests(role);
+ List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role);
if (!hosts.isEmpty()) {
//add the list
log.info("Adding {} hosts for role {}", hosts.size(), role);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index 44415f4..35983c9 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -297,4 +297,9 @@ class MockProviderService implements ProviderService {
void rebuildContainerDetails(List<Container> liveContainers, String applicationId,
Map<Integer, ProviderRole> roleProviderMap) {
}
+
+ @Override
+ void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
index a68ce02..bb42cc1 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy
@@ -78,4 +78,9 @@ class MockRMOperationHandler extends RMOperationHandler {
releases = 0;
requests = 0;
}
+
+ @Override
+ void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ }
+
}