You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/11/17 22:12:55 UTC
[5/8] incubator-slider git commit: SLIDER-988 add mock test of
failure of AA container and re-request; fix any failures
SLIDER-988 add mock test of failure of AA container and re-request; fix any failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/830864ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/830864ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/830864ff
Branch: refs/heads/feature/SLIDER-82-pass-3.1
Commit: 830864ff701a7d1682c39def20dda909c3b4e7e5
Parents: 591ba99
Author: Steve Loughran <st...@apache.org>
Authored: Tue Nov 17 19:55:59 2015 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Nov 17 19:55:59 2015 +0000
----------------------------------------------------------------------
.../apache/slider/common/tools/SliderUtils.java | 15 ++-
.../management/BoolMetricPredicate.java | 44 ++++++++
.../management/LongMetricFunction.java | 44 ++++++++
.../management/MetricsAndMonitoring.java | 51 ++++++++-
.../management/MetricsBindingService.java | 12 +-
.../appmaster/management/MetricsConstants.java | 2 +
.../management/PrefixedMetricsSet.java | 53 +++++++++
.../slider/server/appmaster/state/AppState.java | 69 +++++++-----
.../appmaster/state/OutstandingRequest.java | 6 +-
.../server/appmaster/state/RoleStatus.java | 82 ++++++++++----
.../TestMockAppStateAAOvercapacity.groovy | 110 +++++++++++++++++++
.../appstate/TestMockAppStateAAPlacement.groovy | 11 +-
.../TestRoleHistoryContainerEvents.groovy | 4 +-
.../model/mock/BaseMockAppStateTest.groovy | 14 ---
14 files changed, 438 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
index 5bf8622..eb7a9d5 100644
--- a/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
+++ b/slider-core/src/main/java/org/apache/slider/common/tools/SliderUtils.java
@@ -155,6 +155,7 @@ public final class SliderUtils {
* name of docker program
*/
public static final String DOCKER = "docker";
+ public static final int NODE_LIST_LIMIT = 10;
private SliderUtils() {
}
@@ -2468,7 +2469,7 @@ public final class SliderUtils {
* @return +ve if x is less than y, -ve if y is greater than x; 0 for equality
*/
public static int compareTwoLongsReverse(long x, long y) {
- return (x < y) ? +1 : ((x == y) ? 0 : -1);
+ return (x < y) ? 1 : ((x == y) ? 0 : -1);
}
public static String getSystemEnv(String property) {
@@ -2490,9 +2491,15 @@ public final class SliderUtils {
}
List<String> nodes = request.getNodes();
if (nodes != null) {
- buffer.append("Nodes = [")
- .append(join(nodes, ", ", false))
- .append("]; ");
+ buffer.append("Nodes = [ ");
+ int size = nodes.size();
+ for (int i = 0; i < Math.min(NODE_LIST_LIMIT, size); i++) {
+ buffer.append(nodes.get(i)).append(' ');
+ }
+ if (size > NODE_LIST_LIMIT) {
+ buffer.append(String.format("...(total %d entries)", size));
+ }
+ buffer.append("]; ");
}
List<String> racks = request.getRacks();
if (racks != null) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java
new file mode 100644
index 0000000..82bcd3a
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.management;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+
+/**
+ * A metric which takes a predicate and returns 1 if the predicate evaluates
+ * to true. The predicate is evaluated whenever the metric is read.
+ */
+public class BoolMetricPredicate implements Metric, Gauge<Integer> {
+
+ private final Eval predicate;
+
+ public BoolMetricPredicate(Eval predicate) {
+ this.predicate = predicate;
+ }
+
+ @Override
+ public Integer getValue() {
+ return predicate.eval() ? 1: 0;
+ }
+
+ public interface Eval {
+ boolean eval();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java
new file mode 100644
index 0000000..1de7345
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.management;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+
+/**
+ * A metric which takes a function to generate a long value.
+ * The function is evaluated whenever the metric is read.
+ */
+public class LongMetricFunction implements Metric, Gauge<Long> {
+
+ private final Eval function;
+
+ public LongMetricFunction(Eval function) {
+ this.function = function;
+ }
+
+ @Override
+ public Long getValue() {
+ return function.eval();
+ }
+
+ public interface Eval {
+ long eval();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java
index cced42a..37a8935 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsAndMonitoring.java
@@ -20,9 +20,12 @@ package org.apache.slider.server.appmaster.management;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.health.HealthCheckRegistry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -33,7 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
* Class for all metrics and monitoring
*/
public class MetricsAndMonitoring extends CompositeService {
-
+ protected static final Logger log =
+ LoggerFactory.getLogger(MetricsAndMonitoring.class);
public MetricsAndMonitoring(String name) {
super(name);
}
@@ -52,13 +56,15 @@ public class MetricsAndMonitoring extends CompositeService {
private final Map<String, MeterAndCounter> meterAndCounterMap
= new ConcurrentHashMap<>();
+ private final List<MetricSet> metricSets = new ArrayList<>();
+
/**
* List of recorded events
*/
private final List<RecordedEvent> eventHistory = new ArrayList<>(100);
public static final int EVENT_LIMIT = 1000;
-
+
public MetricRegistry getMetrics() {
return metrics;
}
@@ -74,6 +80,14 @@ public class MetricsAndMonitoring extends CompositeService {
super.serviceInit(conf);
}
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ for (MetricSet set : metricSets) {
+ unregister(set);
+ }
+ }
+
public MeterAndCounter getMeterAndCounter(String name) {
return meterAndCounterMap.get(name);
}
@@ -144,5 +158,38 @@ public class MetricsAndMonitoring extends CompositeService {
public synchronized List<RecordedEvent> cloneEventHistory() {
return new ArrayList<>(eventHistory);
}
+
+ /**
+ * Add a metric set for registering and deregistration on service stop
+ * @param metricSet metric set
+ */
+ public void addMetricSet(MetricSet metricSet) {
+ metricSets.add(metricSet);
+ metrics.registerAll(metricSet);
+ }
+
+ /**
+ * add a metric set, giving each entry a prefix
+ * @param prefix prefix (a trailing "." is automatically added)
+ * @param metricSet the metric set to register
+ */
+ public void addMetricSet(String prefix, MetricSet metricSet) {
+ addMetricSet(new PrefixedMetricsSet(prefix, metricSet));
+ }
+
+ /**
+ * Unregister a metric set; robust
+ * @param metricSet metric set to unregister
+ */
+ public void unregister(MetricSet metricSet) {
+ for (String s : metricSet.getMetrics().keySet()) {
+ try {
+ metrics.remove(s);
+ } catch (IllegalArgumentException e) {
+ // log but continue
+ log.info("Exception when trying to unregister {}", s, e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java
index f8646bf..864a1cf 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsBindingService.java
@@ -19,7 +19,9 @@
package org.apache.slider.server.appmaster.management;
import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Slf4jReporter;
import com.google.common.base.Preconditions;
@@ -29,6 +31,9 @@ import org.apache.slider.server.services.workflow.ClosingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -79,7 +84,7 @@ public class MetricsBindingService extends CompositeService
JmxReporter jmxReporter;
jmxReporter = JmxReporter.forRegistry(metrics).build();
jmxReporter.start();
- addService(new ClosingService<JmxReporter>(jmxReporter));
+ addService(new ClosingService<>(jmxReporter));
// Ganglia
@@ -128,7 +133,7 @@ public class MetricsBindingService extends CompositeService
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
reporter.start(interval, TimeUnit.MINUTES);
- addService(new ClosingService<ScheduledReporter>(reporter));
+ addService(new ClosingService<>(reporter));
summary.append(String.format(", SLF4J to log %s interval=%d",
logName, interval));
}
@@ -136,8 +141,11 @@ public class MetricsBindingService extends CompositeService
log.info(reportingDetails);
}
+
@Override
public String toString() {
return super.toString() + " " + reportingDetails;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
index 31a82a3..fa6bfc0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java
@@ -53,4 +53,6 @@ public class MetricsConstants {
*/
public static final String CONTAINERS_START_FAILED = "containers.start-failed";
+ public static final String PREFIX_SLIDER_ROLES = "slider.roles.";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java
new file mode 100644
index 0000000..e9ad46a
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/PrefixedMetricsSet.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.management;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * From an existing metrics set, generate a new metrics set with the
+ * prefix in front of every key.
+ *
+ * The prefix is added directly: if you want a '.' between prefix and metric
+ * keys, include it in the prefix.
+ */
+public class PrefixedMetricsSet implements MetricSet {
+
+ private final String prefix;
+ private final MetricSet source;
+
+ public PrefixedMetricsSet(String prefix, MetricSet source) {
+ this.prefix = prefix;
+ this.source = source;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> sourceMetrics = source.getMetrics();
+ Map<String, Metric> metrics = new HashMap<>(sourceMetrics.size());
+ for (Map.Entry<String, Metric> entry : sourceMetrics.entrySet()) {
+ metrics.put(prefix + "." + entry.getKey(), entry.getValue());
+ }
+ return metrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 2da5d36..49bc225 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -839,8 +839,11 @@ public class AppState {
}
RoleStatus roleStatus = new RoleStatus(providerRole);
roleStatusMap.put(priority, roleStatus);
- roles.put(providerRole.name, providerRole);
+ String name = providerRole.name;
+ roles.put(name, providerRole);
rolePriorityMap.put(priority, providerRole);
+ // register its entries
+ metricsAndMonitoring.addMetricSet(MetricsConstants.PREFIX_SLIDER_ROLES + name, roleStatus);
return roleStatus;
}
@@ -1511,7 +1514,6 @@ public class AppState {
public int exitStatus = 0;
public boolean unknownNode = false;
-
public String toString() {
final StringBuilder sb =
new StringBuilder("NodeCompletionResult{");
@@ -1969,7 +1971,8 @@ public class AppState {
expected = role.getDesired();
}
- log.info("Reviewing {} : expected {}", role, expected);
+ log.info("Reviewing {} : ", role);
+ log.debug("Expected {}, Delta: {}", expected, delta);
checkFailureThreshold(role);
if (expected < 0 ) {
@@ -1986,29 +1989,28 @@ public class AppState {
log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected);
if (role.isAntiAffinePlacement()) {
- // build one only if there is none outstanding, the role history knows
- // enough about the cluster to ask, and there is somewhere to place
- // the node
- if (role.getPendingAntiAffineRequests() == 0
- && !role.isAARequestOutstanding()
- && roleHistory.canPlaceAANodes()) {
- // log the number outstanding
- AMRMClient.ContainerRequest request = createAAContainerRequest(role);
- if (request != null) {
- log.info("Starting an anti-affine request sequence for {} nodes", delta);
- role.incPendingAntiAffineRequests(delta - 1);
- addContainerRequest(operations, request);
- } else {
- log.info("No location for anti-affine request");
+ long pending = delta;
+ if (roleHistory.canPlaceAANodes()) {
+ // build one only if there is none outstanding, the role history knows
+ // enough about the cluster to ask, and there is somewhere to place
+ // the node
+ if (!role.isAARequestOutstanding()) {
+ // no outstanding AA; try to place things
+ AMRMClient.ContainerRequest request = createAAContainerRequest(role);
+ if (request != null) {
+ pending--;
+ log.info("Starting an anti-affine request sequence for {} nodes; pending={}",
+ delta, pending);
+ addContainerRequest(operations, request);
+ } else {
+ log.info("No location for anti-affine request");
+ }
}
} else {
- if (roleHistory.canPlaceAANodes()) {
- log.info("Adding {} more anti-affine requests", delta);
- } else {
- log.warn("Awaiting node map before generating node requests");
- }
- role.incPendingAntiAffineRequests(delta);
+ log.warn("Awaiting node map before generating anti-affinity requests");
}
+ log.info("Setting pending to {}", pending);
+ role.setPendingAntiAffineRequests(pending);
} else {
for (int i = 0; i < delta; i++) {
@@ -2024,7 +2026,7 @@ public class AppState {
long excess = -delta;
// how many requests are outstanding? for AA roles, this includes pending
- long outstandingRequests = role.getRequested();
+ long outstandingRequests = role.getRequested() + role.getPendingAntiAffineRequests();
if (outstandingRequests > 0) {
// outstanding requests.
int toCancel = (int)Math.min(outstandingRequests, excess);
@@ -2084,13 +2086,11 @@ public class AppState {
roleId,
containersToRelease);
- //crop to the excess
-
- List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
+ // crop to the excess
+ List<RoleInstance> finalCandidates = (excess < numberAvailableForRelease)
? containersToRelease.subList(0, (int)excess)
: containersToRelease;
-
// then build up a release operation, logging each container as released
for (RoleInstance possible : finalCandidates) {
log.info("Targeting for release: {}", possible);
@@ -2099,9 +2099,17 @@ public class AppState {
}
}
+ } else {
+ // actual + requested == desired
+ // there's a special case here: clear all pending AA requests
+ if (role.getPendingAntiAffineRequests() > 0) {
+ log.debug("Clearing outstanding pending AA requests");
+ role.setPendingAntiAffineRequests(0);
+ }
}
- // list of operations to execute
+ // there's now a list of operations to execute
+ log.debug("operations scheduled: {}; updated role: {}", operations.size(), role);
return operations;
}
@@ -2274,9 +2282,10 @@ public class AppState {
if (role.getPendingAntiAffineRequests() > 0) {
// still an outstanding AA request: need to issue a new one.
log.info("Asking for next container for AA role {}", roleName);
- role.decPendingAntiAffineRequests();
if (!addContainerRequest(operations, createAAContainerRequest(role))) {
log.info("No capacity in cluster for new requests");
+ } else {
+ role.decPendingAntiAffineRequests();
}
log.debug("Current AA role status {}", role);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
index 129fd4c..3a75f27 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java
@@ -342,10 +342,12 @@ public final class OutstandingRequest extends RoleHostnamePair {
@Override
public String toString() {
- int requestRoleId = ContainerPriority.extractRole(getPriority());
boolean requestHasLocation = ContainerPriority.hasLocation(getPriority());
final StringBuilder sb = new StringBuilder("OutstandingRequest{");
- sb.append(super.toString());
+ sb.append("roleId=").append(roleId);
+ if (hostname != null) {
+ sb.append(", hostname='").append(hostname).append('\'');
+ }
sb.append(", node=").append(node);
sb.append(", hasLocation=").append(requestHasLocation);
sb.append(", requestedTimeMillis=").append(requestedTimeMillis);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
index 0fc3dc2..656f96c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java
@@ -18,16 +18,21 @@
package org.apache.slider.server.appmaster.state;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.RoleStatistics;
import org.apache.slider.providers.PlacementPolicy;
import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.server.appmaster.management.BoolMetric;
+import org.apache.slider.server.appmaster.management.BoolMetricPredicate;
import org.apache.slider.server.appmaster.management.LongGauge;
import java.io.Serializable;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -38,7 +43,7 @@ import java.util.Map;
* requires synchronization. Where synchronized access is good is that it allows for
* the whole instance to be locked, for updating multiple entries.
*/
-public final class RoleStatus implements Cloneable {
+public final class RoleStatus implements Cloneable, MetricSet {
private final String name;
@@ -48,32 +53,30 @@ public final class RoleStatus implements Cloneable {
private final int key;
private final ProviderRole providerRole;
- private final LongGauge desired = new LongGauge();
private final LongGauge actual = new LongGauge();
- private final LongGauge requested = new LongGauge();
- private final LongGauge releasing = new LongGauge();
- private final LongGauge failed = new LongGauge();
- private final LongGauge startFailed = new LongGauge();
- private final LongGauge started= new LongGauge();
private final LongGauge completed = new LongGauge();
- private final LongGauge totalRequested = new LongGauge();
- private final LongGauge preempted = new LongGauge(0);
- private final LongGauge nodeFailed = new LongGauge(0);
+ private final LongGauge desired = new LongGauge();
+ private final LongGauge failed = new LongGauge();
private final LongGauge failedRecently = new LongGauge(0);
private final LongGauge limitsExceeded = new LongGauge(0);
+ private final LongGauge nodeFailed = new LongGauge(0);
+ /** Number of AA requests queued. */
+ private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
+ private final LongGauge preempted = new LongGauge(0);
+ private final LongGauge releasing = new LongGauge();
+ private final LongGauge requested = new LongGauge();
+ private final LongGauge started = new LongGauge();
+ private final LongGauge startFailed = new LongGauge();
+ private final LongGauge totalRequested = new LongGauge();
/** resource requirements */
private Resource resourceRequirements;
- /**
- * Number of AA requests queued. These should be reduced first on a
- * flex down.
- */
- private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
/** any pending AA request */
private volatile OutstandingRequest outstandingAArequest = null;
+
private String failureMessage = "";
public RoleStatus(ProviderRole providerRole) {
@@ -81,7 +84,37 @@ public final class RoleStatus implements Cloneable {
this.name = providerRole.name;
this.key = providerRole.id;
}
-
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metrics = new HashMap<>(15);
+ metrics.put("actual", actual);
+ metrics.put("completed", completed );
+ metrics.put("desired", desired);
+ metrics.put("failed", failed);
+ metrics.put("limitsExceeded", limitsExceeded);
+ metrics.put("nodeFailed", nodeFailed);
+ metrics.put("preempted", preempted);
+ metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests);
+ metrics.put("releasing", releasing);
+ metrics.put("requested", requested);
+ metrics.put("preempted", preempted);
+ metrics.put("releasing", releasing );
+ metrics.put("requested", requested);
+ metrics.put("started", started);
+ metrics.put("startFailed", startFailed);
+ metrics.put("totalRequested", totalRequested);
+
+ metrics.put("outstandingAArequest",
+ new BoolMetricPredicate(new BoolMetricPredicate.Eval() {
+ @Override
+ public boolean eval() {
+ return isAARequestOutstanding();
+ }
+ }));
+ return metrics;
+ }
+
public String getName() {
return name;
}
@@ -157,11 +190,11 @@ public final class RoleStatus implements Cloneable {
}
/**
- * Get the request count. For AA roles, this includes pending ones.
+ * Get the request count.
* @return a count of requested containers
*/
public long getRequested() {
- return requested.get() + pendingAntiAffineRequests.get();
+ return requested.get();
}
public long incRequested() {
@@ -222,6 +255,14 @@ public final class RoleStatus implements Cloneable {
}
/**
+ * expose the predicate {@link #isAARequestOutstanding()} as an integer,
+ * which is very convenient in tests
+ * @return 1 if there is an outstanding request; 0 if not
+ */
+ public int getOutstandingAARequestCount() {
+ return isAARequestOutstanding()? 1: 0;
+ }
+ /**
* Note that a role failed, text will
* be used in any diagnostics if an exception
* is later raised.
@@ -350,7 +391,6 @@ public final class RoleStatus implements Cloneable {
*/
public long getDelta() {
long inuse = getActualAndRequested();
- //don't know how to view these. Are they in-use or not?
long delta = desired.get() - inuse;
if (delta < 0) {
//if we are releasing, remove the number that are already released.
@@ -366,7 +406,7 @@ public final class RoleStatus implements Cloneable {
* @return the size of the application when outstanding requests are included.
*/
public long getActualAndRequested() {
- return actual.get() + requested.get() + pendingAntiAffineRequests.get();
+ return actual.get() + requested.get();
}
@Override
@@ -499,7 +539,7 @@ public final class RoleStatus implements Cloneable {
public synchronized RoleStatistics getStatistics() {
RoleStatistics stats = new RoleStatistics();
- stats.activeAA = isAARequestOutstanding() ? 1: 0;
+ stats.activeAA = getOutstandingAARequestCount();
stats.actual = actual.get();
stats.desired = desired.get();
stats.failed = failed.get();
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy
new file mode 100644
index 0000000..7728748
--- /dev/null
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAOvercapacity.groovy
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.model.appstate
+
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.NodeState
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.slider.core.main.LauncherExitCodes
+import org.apache.slider.server.appmaster.model.mock.MockNodeReport
+import org.apache.slider.server.appmaster.model.mock.MockRoles
+import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation
+import org.apache.slider.server.appmaster.state.AppState
+import org.apache.slider.server.appmaster.state.ContainerAssignment
+import org.apache.slider.server.appmaster.state.NodeInstance
+import org.apache.slider.server.appmaster.state.NodeMap
+import org.apache.slider.server.appmaster.state.RoleInstance
+import org.junit.Test
+
+/**
+ * Test Anti-affine placement with a cluster of size 1
+ */
+@CompileStatic
+@Slf4j
+class TestMockAppStateAAOvercapacity extends BaseMockAppStateAATest
+ implements MockRoles {
+
+ private int NODES = 1
+
+ @Override
+ MockYarnEngine createYarnEngine() {
+ new MockYarnEngine(NODES, 1)
+ }
+
+ void assertAllContainersAA() {
+ assertAllContainersAA(aaRole.key)
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testOvercapacityRecovery() throws Throwable {
+
+ describe("Ask for 1 more than the no of available nodes;" +
+ "verify the state. kill the allocated container and review")
+ //more than expected
+ long desired = 3
+ aaRole.desired = desired
+ assert appState.roleHistory.canPlaceAANodes()
+
+ //first request
+ List<AbstractRMOperation > operations = appState.reviewRequestAndReleaseNodes()
+ assert aaRole.AARequestOutstanding
+ assert desired - 1 == aaRole.pendingAntiAffineRequests
+ List<AbstractRMOperation > operationsOut = []
+ // allocate and re-submit
+ def instances = submitOperations(operations, [], operationsOut)
+ assert 1 == instances.size()
+ assertAllContainersAA()
+
+ // expect an outstanding AA request to be unsatisfied
+ assert aaRole.actual < aaRole.desired
+ assert !aaRole.requested
+ assert !aaRole.AARequestOutstanding
+ assert desired - 1 == aaRole.pendingAntiAffineRequests
+ List<Container> allocatedContainers = engine.execute(operations, [])
+ assert 0 == allocatedContainers.size()
+
+ // now lets trigger a failure
+ def nodemap = cloneNodemap()
+ assert nodemap.size() == 1
+
+ def instance = instances[0]
+ def cid = instance.containerId
+
+ AppState.NodeCompletionResult result = appState.onCompletedNode(containerStatus(cid,
+ LauncherExitCodes.EXIT_TASK_LAUNCH_FAILURE))
+ assert result.containerFailed
+
+ assert aaRole.failed == 1
+ assert aaRole.actual == 0
+ def availablePlacements = appState.getRoleHistory().findNodeForNewAAInstance(aaRole)
+ assert availablePlacements.size() == 1
+ describe "expecting a successful review with available placements of $availablePlacements"
+ operations = appState.reviewRequestAndReleaseNodes()
+ assert operations.size() == 1
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
index 749e4fc..3461e23 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy
@@ -113,7 +113,9 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest
aaRole.desired = 2
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
getSingleRequest(ops)
+ assert aaRole.requested == 1
assert aaRole.pendingAntiAffineRequests == 1
+ assert aaRole.actualAndRequested + aaRole.pendingAntiAffineRequests == aaRole.desired
// now trigger that flex up
aaRole.desired = 3
@@ -121,6 +123,13 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest
// expect: no new reqests, pending count ++
List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes()
assert ops2.empty
+ assert aaRole.actual + aaRole.pendingAntiAffineRequests + aaRole.outstandingAARequestCount ==
+ aaRole.desired
+
+ // 1 outstanding
+ assert aaRole.actual == 0
+ assert aaRole.AARequestOutstanding
+ // and one AA
assert aaRole.pendingAntiAffineRequests == 2
assertAllContainersAA()
@@ -141,7 +150,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateAATest
}
@Test
- public void testAllocateFlexDown() throws Throwable {
+ public void testAllocateFlexDownDecrementsPending() throws Throwable {
// want multiple instances, so there will be iterations
aaRole.desired = 2
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
index c8a82bd..ca42546 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy
@@ -28,12 +28,10 @@ import org.apache.hadoop.yarn.api.records.Priority
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.api.ResourceKeys
-import org.apache.slider.providers.ProviderRole
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockContainer
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.model.mock.MockNodeId
-import org.apache.slider.server.appmaster.model.mock.MockRoleHistory
import org.apache.slider.server.appmaster.state.*
import org.junit.Test
@@ -402,7 +400,7 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest {
int startSize = nodemap.size()
// now send a list of updated (failed) nodes event
- List<NodeReport> nodesUpdated = new ArrayList<NodeReport>();
+ List<NodeReport> nodesUpdated = new ArrayList<>();
NodeReport nodeReport = NodeReport.newInstance(
NodeId.newInstance(hostname, 0),
NodeState.LOST,
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/830864ff/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
index da1bcb9..a53e0be 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy
@@ -401,20 +401,6 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles
}
/**
- * Scan through all containers and assert that the assignment is AA
- * @param index role index
- */
- void assertAllContainersAAOld(String index) {
- def nodemap = stateAccess.nodeInformationSnapshot
- nodemap.each { name, info ->
- def nodeEntry = info.entries[index]
- assert nodeEntry == null ||
- (nodeEntry.live - nodeEntry.releasing + nodeEntry.starting) <= 1,
- "too many instances on node $name"
- }
- }
-
- /**
* Get the node information as a large JSON String
* @return
*/