You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/03/25 21:28:47 UTC
[18/25] incubator-slider git commit: SLIDER--799 SLIDER-828 when
containers are allocated, explicitly cancel the request
SLIDER--799 SLIDER-828 when containers are allocated, explicitly cancel the request
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1938323c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1938323c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1938323c
Branch: refs/heads/develop
Commit: 1938323c7fef2666a429a06a55208a528e93f64b
Parents: 7d9a9e9
Author: Steve Loughran <st...@apache.org>
Authored: Tue Mar 24 13:44:09 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Mar 24 13:49:14 2015 +0000
----------------------------------------------------------------------
.../server/appmaster/SliderAppMaster.java | 2 +-
.../appmaster/management/MetricsConstants.java | 33 ++++-
.../operations/AsyncRMOperationHandler.java | 6 +-
.../operations/CancelRequestOperation.java | 14 +-
.../slider/server/appmaster/state/AppState.java | 147 +++++++++----------
.../server/appmaster/state/RoleHistory.java | 2 +-
.../TestMockAppStateRebuildOnAMRestart.groovy | 4 +-
7 files changed, 124 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index ab6b55c..b8584f7 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1840,7 +1840,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
rmOperationHandler.cancelSingleRequest(request);
}
- /* =================================================================== */
+/* =================================================================== */
/* END */
/* =================================================================== */
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
index e55cf60..31a82a3 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
@@ -22,6 +22,35 @@ package org.apache.slider.server.appmaster.management;
* Constants used in slider for metrics registration and lookup
*/
public class MetricsConstants {
- public static final String CONTAINERS_OUTSTANDING_REQUESTS =
- "containers.outstanding-requests";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_OUTSTANDING_REQUESTS = "containers.outstanding-requests";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_STARTED = "containers.started";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_SURPLUS = "containers.surplus";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_COMPLETED = "containers.completed";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_FAILED = "containers.failed";
+
+ /**
+ * {@value}
+ */
+ public static final String CONTAINERS_START_FAILED = "containers.start-failed";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
index 7c98551..11afc0e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java
@@ -50,8 +50,10 @@ public class AsyncRMOperationHandler extends RMOperationHandler {
// need to revoke a previously issued container request
// so enum the sets and pick some
int remaining = cancelSinglePriorityRequests(priority1, count);
- remaining = cancelSinglePriorityRequests(priority2, remaining);
-
+ if (priority2 != null) {
+ remaining = cancelSinglePriorityRequests(priority2, remaining);
+ }
+
return remaining;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
index 9e9f277..754bf28 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java
@@ -18,11 +18,12 @@
package org.apache.slider.server.appmaster.operations;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.slider.server.appmaster.state.ContainerPriority;
/**
- * Cancel a container request
+ * Cancel a container request at the given priority/proirities.
*/
public class CancelRequestOperation extends AbstractRMOperation {
@@ -30,7 +31,15 @@ public class CancelRequestOperation extends AbstractRMOperation {
private final Priority priority2;
private final int count;
+ /**
+ * Create an instance
+ * @param priority1 first priority, the one that is released first
+ * @param priority2 optional second priority
+ * @param count number of requests to cancel
+ */
public CancelRequestOperation(Priority priority1, Priority priority2, int count) {
+ Preconditions.checkArgument(priority1 != null, "null priority");
+ Preconditions.checkArgument(count >= 0, "negative count");
this.priority1 = priority1;
this.priority2 = priority2;
this.count = count;
@@ -45,7 +54,8 @@ public class CancelRequestOperation extends AbstractRMOperation {
public String toString() {
return "release " + count
+ " requests for " + ContainerPriority.toString(priority1)
- + " and " + ContainerPriority.toString(priority2);
+ + (priority2 != null ?
+ (" and " + ContainerPriority.toString(priority2)) : "");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 20e2fc0..34b0492 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -170,7 +170,7 @@ public class AppState {
* Client properties created via the provider -static for the life
* of the application
*/
- private Map<String, String> clientProperties = new HashMap<String, String>();
+ private Map<String, String> clientProperties = new HashMap<>();
/**
* This is a template of the cluster status
@@ -196,7 +196,7 @@ public class AppState {
* been allocated but are not live; it is a superset of the live list
*/
private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers =
- new ConcurrentHashMap<ContainerId, RoleInstance>();
+ new ConcurrentHashMap<>();
/**
* Hash map of the containers we have released, but we
@@ -204,33 +204,33 @@ public class AppState {
* containers is treated as a successful outcome
*/
private final ConcurrentMap<ContainerId, Container> containersBeingReleased =
- new ConcurrentHashMap<ContainerId, Container>();
+ new ConcurrentHashMap<>();
/**
* Counter for completed containers ( complete denotes successful or failed )
*/
- private final AtomicInteger completedContainerCount = new AtomicInteger();
+ private final Counter completedContainerCount = new Counter();
/**
* Count of failed containers
*/
- private final AtomicInteger failedContainerCount = new AtomicInteger();
+ private final Counter failedContainerCount = new Counter();
/**
* # of started containers
*/
- private final AtomicInteger startedContainers = new AtomicInteger();
+ private final Counter startedContainers = new Counter();
/**
* # of containers that failed to start
*/
- private final AtomicInteger startFailedContainers = new AtomicInteger();
+ private final Counter startFailedContainerCount = new Counter();
/**
* Track the number of surplus containers received and discarded
*/
- private final AtomicInteger surplusContainers = new AtomicInteger();
+ private final Counter surplusContainers = new Counter();
/**
@@ -244,21 +244,21 @@ public class AppState {
* the node is promoted from here to the containerMap
*/
private final Map<ContainerId, RoleInstance> startingNodes =
- new ConcurrentHashMap<ContainerId, RoleInstance>();
+ new ConcurrentHashMap<>();
/**
* List of completed nodes. This isn't kept in the CD as it gets too
* big for the RPC responses. Indeed, we should think about how deep to get this
*/
private final Map<ContainerId, RoleInstance> completedNodes
- = new ConcurrentHashMap<ContainerId, RoleInstance>();
+ = new ConcurrentHashMap<>();
/**
* Nodes that failed to start.
* Again, kept out of the CD
*/
private final Map<ContainerId, RoleInstance> failedNodes =
- new ConcurrentHashMap<ContainerId, RoleInstance>();
+ new ConcurrentHashMap<>();
/**
* Nodes that came assigned to a role above that
@@ -267,11 +267,11 @@ public class AppState {
private final Set<ContainerId> surplusNodes = new HashSet<ContainerId>();
/**
- * Map of containerID -> cluster nodes, for status reports.
+ * Map of containerID to cluster nodes, for status reports.
* Access to this should be synchronized on the clusterDescription
*/
private final Map<ContainerId, RoleInstance> liveNodes =
- new ConcurrentHashMap<ContainerId, RoleInstance>();
+ new ConcurrentHashMap<>();
private final AtomicInteger completionOfNodeNotInLiveListEvent =
new AtomicInteger();
private final AtomicInteger completionOfUnknownContainerEvent =
@@ -311,54 +311,53 @@ public class AppState {
MetricsAndMonitoring metricsAndMonitoring) {
this.recordFactory = recordFactory;
this.metricsAndMonitoring = metricsAndMonitoring;
-
+
// register any metrics
- MetricRegistry metrics = metricsAndMonitoring.getMetrics();
- metrics.register(
+ register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests);
+ register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers);
+ register(MetricsConstants.CONTAINERS_STARTED, startedContainers);
+ register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount);
+ register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount);
+ register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount);
+ }
+
+ private void register(String name, Counter counter) {
+ this.metricsAndMonitoring.getMetrics().register(
MetricRegistry.name(AppState.class,
- MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS),
- outstandingContainerRequests);
+ name), counter);
}
- public int getFailedCountainerCount() {
- return failedContainerCount.get();
+ public long getFailedCountainerCount() {
+ return failedContainerCount.getCount();
}
/**
- * Increment the count and return the new value
- * @return the latest failed container count
+ * Increment the count
*/
- public int incFailedCountainerCount() {
- return failedContainerCount.incrementAndGet();
+ public void incFailedCountainerCount() {
+ failedContainerCount.inc();
}
- public int getStartFailedCountainerCount() {
- return startFailedContainers.get();
+ public long getStartFailedCountainerCount() {
+ return startFailedContainerCount.getCount();
}
/**
* Increment the count and return the new value
- * @return the latest failed container count
*/
- public int incStartedCountainerCount() {
- return startedContainers.incrementAndGet();
+ public void incStartedCountainerCount() {
+ startedContainers.inc();
}
- public int getStartedCountainerCount() {
- return startedContainers.get();
+ public long getStartedCountainerCount() {
+ return startedContainers.getCount();
}
/**
* Increment the count and return the new value
- * @return the latest failed container count
*/
- public int incStartFailedCountainerCount() {
- return startFailedContainers.incrementAndGet();
- }
-
-
- public AtomicInteger getStartFailedContainers() {
- return startFailedContainers;
+ public void incStartFailedCountainerCount() {
+ startFailedContainerCount.inc();
}
public AtomicInteger getCompletionOfNodeNotInLiveListEvent() {
@@ -535,7 +534,7 @@ public class AppState {
this.applicationInfo = applicationInfo != null ? applicationInfo
: new HashMap<String, String>();
- clientProperties = new HashMap<String, String>();
+ clientProperties = new HashMap<>();
containerReleaseSelector = releaseSelector;
@@ -1744,19 +1743,19 @@ public class AppState {
* keylist.
*/
protected Map<String, Integer> getLiveStatistics() {
- Map<String, Integer> sliderstats = new HashMap<String, Integer>();
+ Map<String, Integer> sliderstats = new HashMap<>();
+ sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
+ liveNodes.size());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED,
- completedContainerCount.get());
+ (int)completedContainerCount.getCount());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED,
- failedContainerCount.get());
- sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE,
- liveNodes.size());
+ (int)failedContainerCount.getCount());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED,
- startedContainers.get());
+ (int)startedContainers.getCount());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED,
- startFailedContainers.get());
+ (int) startFailedContainerCount.getCount());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS,
- surplusContainers.get());
+ (int)surplusContainers.getCount());
sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED,
completionOfUnknownContainerEvent.get());
return sliderstats;
@@ -1893,7 +1892,6 @@ public class AppState {
throws SliderInternalStateException, TriggerClusterTeardownException {
List<AbstractRMOperation> operations = new ArrayList<>();
int delta;
- String details;
int expected;
String name = role.getName();
synchronized (role) {
@@ -1903,7 +1901,7 @@ public class AppState {
log.info("Reviewing {} : expected {}", role, expected);
checkFailureThreshold(role);
-
+
if (expected < 0 ) {
// negative value: fail
throw new TriggerClusterTeardownException(
@@ -1912,7 +1910,7 @@ public class AppState {
"Negative component count of %d desired for component %s",
expected, role);
}
-
+
if (delta > 0) {
log.info("{}: Asking for {} more nodes(s) for a total of {} ", name,
delta, expected);
@@ -2004,7 +2002,7 @@ public class AppState {
for (RoleInstance possible : finalCandidates) {
log.debug("Targeting for release: {}", possible);
containerReleaseSubmitted(possible.container);
- operations.add(new ContainerReleaseOperation(possible.getId()));
+ operations.add(new ContainerReleaseOperation(possible.getId()));
}
}
@@ -2033,7 +2031,6 @@ public class AppState {
return operations;
}
-
/**
* Find a container running on a specific host -looking
* into the node ID to determine this.
@@ -2055,7 +2052,7 @@ public class AppState {
}
return null;
}
-
+
/**
* Release all containers.
* @return a list of operations to execute
@@ -2105,40 +2102,42 @@ public class AppState {
String containerHostInfo = container.getNodeId().getHost()
+ ":" +
container.getNodeId().getPort();
- int allocated;
- int desired;
//get the role
- ContainerId cid = container.getId();
- RoleStatus role = lookupRoleStatus(container);
+ final ContainerId cid = container.getId();
+ final RoleStatus role = lookupRoleStatus(container);
//dec requested count
decrementRequestCount(role);
+
+ // cancel an allocation request which granted this, so as to avoid repeated
+ // requests
+ releaseOperations.add(new CancelRequestOperation(container.getPriority(), null, 1));
+
//inc allocated count -this may need to be dropped in a moment,
// but us needed to update the logic below
- allocated = role.incActual();
+ final int allocated = role.incActual();
+ final int desired = role.getDesired();
- //look for (race condition) where we get more back than we asked
- desired = role.getDesired();
-
- ContainerAllocationOutcome outcome = roleHistory.onContainerAllocated(container,
- desired,
- allocated);
+ final String roleName = role.getName();
+ final ContainerAllocationOutcome outcome =
+ roleHistory.onContainerAllocated(container, desired, allocated);
+ //look for condition where we get more back than we asked
if (allocated > desired) {
- log.info("Discarding surplus container {} on {}", cid,
- containerHostInfo);
+ log.info("Discarding surplus {} container {} on {}", roleName, cid,
+ containerHostInfo);
releaseOperations.add(new ContainerReleaseOperation(cid));
//register as a surplus node
surplusNodes.add(cid);
- surplusContainers.incrementAndGet();
+ surplusContainers.inc();
//and, as we aren't binding it to role, dec that role's actual count
role.decActual();
} else {
- // this is valid, so decrement the number of outstanding requests
+ // Allocation being accepted -so decrement the number of outstanding requests
decOutstandingContainerRequests();
- String roleName = role.getName();
+
log.info("Assigning role {} to container" +
" {}," +
" on {}:{},",
@@ -2174,8 +2173,8 @@ public class AppState {
* @return true if a rebuild took place (even if size 0)
* @throws RuntimeException on problems
*/
- private boolean rebuildModelFromRestart(List<Container> liveContainers) throws
- BadClusterStateException {
+ private boolean rebuildModelFromRestart(List<Container> liveContainers)
+ throws BadClusterStateException {
if (liveContainers == null) {
return false;
}
@@ -2193,8 +2192,8 @@ public class AppState {
* @param container container that was running before the AM restarted
* @throws RuntimeException on problems
*/
- private void addRestartedContainer(Container container) throws
- BadClusterStateException {
+ private void addRestartedContainer(Container container)
+ throws BadClusterStateException {
String containerHostInfo = container.getNodeId().getHost()
+ ":" +
container.getNodeId().getPort();
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
index 99108fe..64f9184 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java
@@ -625,7 +625,7 @@ public class RoleHistory {
List<Container> unrequested =
new ArrayList<>(allocatedContainers.size());
outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested);
-
+
//give the unrequested ones lower priority
requested.addAll(unrequested);
return requested;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
index e0fdf1b..c310583 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy
@@ -85,9 +85,9 @@ class TestMockAppStateRebuildOnAMRestart extends BaseMockAppStateTest
null,
new MostRecentContainerReleaseSelector())
- assert appState.getStartedCountainerCount() == clusterSize
+ assert appState.startedCountainerCount == clusterSize
- appState.getRoleHistory().dump();
+ appState.roleHistory.dump();
//check that the app state direct structures match
List<RoleInstance> r0live = appState.enumLiveNodesInRole(ROLE0)