You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/20 21:22:46 UTC
aurora git commit: Modifying resource counters to support revocable
resources.
Repository: aurora
Updated Branches:
refs/heads/master f5025f3c6 -> 74a121772
Modifying resource counters to support revocable resources.
Bugs closed: AURORA-1439
Reviewed at https://reviews.apache.org/r/37593/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/74a12177
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/74a12177
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/74a12177
Branch: refs/heads/master
Commit: 74a1217721c73213f65a3f604df0639e7b97823a
Parents: f5025f3
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Aug 20 12:19:02 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Aug 20 12:19:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/Resources.java | 6 +-
.../scheduler/stats/AsyncStatsModule.java | 48 +++++++----
.../aurora/scheduler/stats/ResourceCounter.java | 2 +-
.../aurora/scheduler/stats/SlotSizeCounter.java | 88 ++++++++++++--------
.../scheduler/stats/AsyncStatsModuleTest.java | 73 ++++++++++++++++
.../scheduler/stats/SlotSizeCounterTest.java | 60 +++++++++----
6 files changed, 208 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index 3b4fbb6..b34a629 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -17,7 +17,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -55,14 +54,13 @@ public final class Resources {
/**
* Revocable resource filter.
*/
- @VisibleForTesting
- static final Predicate<Resource> REVOCABLE =
+ public static final Predicate<Resource> REVOCABLE =
Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable));
/**
* Non-revocable resource filter.
*/
- private static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
+ public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable);
private final Iterable<Resource> mesosResources;
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 74a6546..09a2f00 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -16,8 +16,7 @@ package org.apache.aurora.scheduler.stats;
import javax.inject.Inject;
import javax.inject.Singleton;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.inject.AbstractModule;
@@ -41,6 +40,10 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.ResourceSlot.NONE;
+import static org.apache.aurora.scheduler.Resources.NON_REVOCABLE;
+import static org.apache.aurora.scheduler.Resources.REVOCABLE;
+
/**
* Module to configure export of cluster-wide resource allocation and consumption statistics.
*/
@@ -136,19 +139,6 @@ public class AsyncStatsModule extends AbstractModule {
}
static class OfferAdapter implements MachineResourceProvider {
- private static final Function<HostOffer, MachineResource> TO_RESOURCE =
- new Function<HostOffer, MachineResource>() {
- @Override
- public MachineResource apply(HostOffer offer) {
- ResourceSlot resources = Resources.from(offer.getOffer()).slot();
- IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(resources.getNumCpus())
- .setRamMb(resources.getRam().as(Data.MB))
- .setDiskMb(resources.getDisk().as(Data.MB)));
- return new MachineResource(quota, Conversions.isDedicated(offer.getOffer()));
- }
- };
-
private final OfferManager offerManager;
@Inject
@@ -159,7 +149,33 @@ public class AsyncStatsModule extends AbstractModule {
@Override
public Iterable<MachineResource> get() {
Iterable<HostOffer> offers = offerManager.getOffers();
- return FluentIterable.from(offers).transform(TO_RESOURCE);
+
+ ImmutableList.Builder<MachineResource> builder = ImmutableList.builder();
+ for (HostOffer offer : offers) {
+ ResourceSlot revocable = Resources.from(offer.getOffer()).filter(REVOCABLE).slot();
+ ResourceSlot nonRevocable =
+ Resources.from(offer.getOffer()).filter(NON_REVOCABLE).slot();
+ boolean isDedicated = Conversions.isDedicated(offer.getOffer());
+
+ // It's insufficient to compare revocable against NONE here as RAM, DISK and PORTS
+ // are always rolled in to revocable as non-compressible resources. Only if revocable
+ // CPU is non-zero should we expose the revocable resources as aggregates.
+ if (revocable.getNumCpus() > 0.0) {
+ builder.add(new MachineResource(fromSlot(revocable), isDedicated, true));
+ }
+
+ if (!nonRevocable.equals(NONE)) {
+ builder.add(new MachineResource(fromSlot(nonRevocable), isDedicated, false));
+ }
+ }
+ return builder.build();
+ }
+
+ private static IResourceAggregate fromSlot(ResourceSlot slot) {
+ return IResourceAggregate.build(new ResourceAggregate()
+ .setNumCpus(slot.getNumCpus())
+ .setRamMb(slot.getRam().as(Data.MB))
+ .setDiskMb(slot.getDisk().as(Data.MB)));
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
index 6dbc5d6..36e2c93 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java
@@ -132,7 +132,7 @@ public class ResourceCounter {
}
public enum MetricType {
- TOTAL_CONSUMED(Predicates.alwaysTrue()),
+ TOTAL_CONSUMED(Predicates.<ITaskConfig>alwaysTrue()),
DEDICATED_CONSUMED(new Predicate<ITaskConfig>() {
@Override
public boolean apply(ITaskConfig task) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
index 39c055d..e7be8e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -14,14 +14,16 @@
package org.apache.aurora.scheduler.stats;
import java.util.Map;
+import java.util.Objects;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
import org.apache.aurora.scheduler.ResourceAggregates;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
@@ -39,6 +41,14 @@ class SlotSizeCounter implements Runnable {
"large", ResourceAggregates.LARGE,
"xlarge", ResourceAggregates.XLARGE);
+ // Ensures all counters are always initialized regardless of the Resource availability.
+ private static final Iterable<String> SLOT_GROUPS = ImmutableList.of(
+ getPrefix(false, false),
+ getPrefix(false, true),
+ getPrefix(true, false),
+ getPrefix(true, true)
+ );
+
private final Map<String, IResourceAggregate> slotSizes;
private final MachineResourceProvider machineResourceProvider;
private final CachedCounters cachedCounters;
@@ -57,10 +67,12 @@ class SlotSizeCounter implements Runnable {
static class MachineResource {
private final IResourceAggregate size;
private final boolean dedicated;
+ private final boolean revocable;
- public MachineResource(IResourceAggregate size, boolean dedicated) {
+ public MachineResource(IResourceAggregate size, boolean dedicated, boolean revocable) {
this.size = requireNonNull(size);
this.dedicated = dedicated;
+ this.revocable = revocable;
}
public IResourceAggregate getSize() {
@@ -70,6 +82,27 @@ class SlotSizeCounter implements Runnable {
public boolean isDedicated() {
return dedicated;
}
+
+ public boolean isRevocable() {
+ return revocable;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(size, dedicated, revocable);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof MachineResource)) {
+ return false;
+ }
+
+ MachineResource other = (MachineResource) obj;
+ return Objects.equals(size, other.size)
+ && Objects.equals(dedicated, other.dedicated)
+ && Objects.equals(revocable, other.revocable);
+ }
}
interface MachineResourceProvider {
@@ -81,13 +114,15 @@ class SlotSizeCounter implements Runnable {
this(SLOT_SIZES, machineResourceProvider, cachedCounters);
}
+ private static String getPrefix(boolean dedicated, boolean revocable) {
+ String dedicatedSuffix = dedicated ? "dedicated_" : "";
+ String revocableSuffix = revocable ? "revocable_" : "";
+ return "empty_slots_" + dedicatedSuffix + revocableSuffix;
+ }
+
@VisibleForTesting
- static String getStatName(String slotName, boolean dedicated) {
- if (dedicated) {
- return "empty_slots_dedicated_" + slotName;
- } else {
- return "empty_slots_" + slotName;
- }
+ static String getStatName(String slotName, boolean dedicated, boolean revocable) {
+ return getPrefix(dedicated, revocable) + slotName;
}
private int countSlots(Iterable<IResourceAggregate> slots, final IResourceAggregate slotSize) {
@@ -105,40 +140,27 @@ class SlotSizeCounter implements Runnable {
return sum;
}
- private static Predicate<MachineResource> isDedicated(final boolean dedicated) {
- return new Predicate<MachineResource>() {
- @Override
- public boolean apply(MachineResource slot) {
- return slot.isDedicated() == dedicated;
- }
- };
- }
-
- private static final Function<MachineResource, IResourceAggregate> GET_SIZE =
- new Function<MachineResource, IResourceAggregate>() {
- @Override
- public IResourceAggregate apply(MachineResource slot) {
- return slot.getSize();
- }
- };
-
private void updateStats(
String name,
- boolean dedicated,
Iterable<MachineResource> slots,
IResourceAggregate slotSize) {
- Iterable<IResourceAggregate> sizes =
- FluentIterable.from(slots).filter(isDedicated(dedicated)).transform(GET_SIZE);
- cachedCounters.get(getStatName(name, dedicated)).set(countSlots(sizes, slotSize));
+ ImmutableMultimap.Builder<String, IResourceAggregate> builder = ImmutableMultimap.builder();
+ for (MachineResource slot : slots) {
+ builder.put(getStatName(name, slot.isDedicated(), slot.isRevocable()), slot.getSize());
+ }
+
+ ImmutableMultimap<String, IResourceAggregate> sizes = builder.build();
+
+ for (String slotGroup : SLOT_GROUPS) {
+ String statName = slotGroup + name;
+ cachedCounters.get(statName).set(countSlots(sizes.get(statName), slotSize));
+ }
}
@Override
public void run() {
Iterable<MachineResource> slots = machineResourceProvider.get();
- for (Map.Entry<String, IResourceAggregate> entry : slotSizes.entrySet()) {
- updateStats(entry.getKey(), false, slots, entry.getValue());
- updateStats(entry.getKey(), true, slots, entry.getValue());
- }
+ slotSizes.entrySet().stream().forEach(e -> updateStats(e.getKey(), slots, e.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
new file mode 100644
index 0000000..7519ce3
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.stats;
+
+import com.google.common.collect.ImmutableList;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.ResourceType;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.mesos.Protos;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.stats.AsyncStatsModule.OfferAdapter;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class AsyncStatsModuleTest extends EasyMockTest {
+ @Test
+ public void testOfferAdapter() {
+ OfferManager offerManager = createMock(OfferManager.class);
+ expect(offerManager.getOffers()).andReturn(ImmutableList.of(
+ new HostOffer(Protos.Offer.newBuilder()
+ .setId(Protos.OfferID.newBuilder().setValue("offerId"))
+ .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("slaveId"))
+ .setHostname("hostName")
+ .addResources(getCpuResource(true, 2.0))
+ .addResources(getCpuResource(false, 4.0))
+ .build(),
+ IHostAttributes.build(new HostAttributes()))));
+
+ control.replay();
+
+ OfferAdapter adapter = new OfferAdapter(offerManager);
+
+ assertEquals(ImmutableList.of(resource(true, 2.0), resource(false, 4.0)), adapter.get());
+ }
+
+ private static MachineResource resource(boolean revocable, double cpu) {
+ return new MachineResource(
+ IResourceAggregate.build(new ResourceAggregate(cpu, 0, 0)), false, revocable);
+ }
+
+ private static Protos.Resource getCpuResource(boolean revocable, double value) {
+ Protos.Resource.Builder builder = Protos.Resource.newBuilder()
+ .setName(ResourceType.CPUS.getName())
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(value));
+
+ if (revocable) {
+ builder.setRevocable(Protos.Resource.RevocableInfo.newBuilder());
+ }
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
index b6623d5..576078f 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
@@ -48,8 +48,12 @@ public class SlotSizeCounterTest extends EasyMockTest {
private AtomicLong smallCounter = new AtomicLong();
private AtomicLong smallDedicatedCounter = new AtomicLong();
+ private AtomicLong smallRevocableCounter = new AtomicLong();
+ private AtomicLong smallDedicatedRevocableCounter = new AtomicLong();
private AtomicLong largeCounter = new AtomicLong();
private AtomicLong largeDedicatedCounter = new AtomicLong();
+ private AtomicLong largeRevocableCounter = new AtomicLong();
+ private AtomicLong largeDedicatedRevocableCounter = new AtomicLong();
@Before
public void setUp() {
@@ -59,14 +63,22 @@ public class SlotSizeCounterTest extends EasyMockTest {
}
private void expectStatExport() {
- expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false)))
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false, false)))
.andReturn(smallCounter);
- expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true)))
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true, false)))
.andReturn(smallDedicatedCounter);
- expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false)))
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false, true)))
+ .andReturn(smallRevocableCounter);
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true, true)))
+ .andReturn(smallDedicatedRevocableCounter);
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false, false)))
.andReturn(largeCounter);
- expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true)))
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true, false)))
.andReturn(largeDedicatedCounter);
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false, true)))
+ .andReturn(largeRevocableCounter);
+ expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true, true)))
+ .andReturn(largeDedicatedRevocableCounter);
}
private void expectGetSlots(MachineResource... returned) {
@@ -83,23 +95,31 @@ public class SlotSizeCounterTest extends EasyMockTest {
slotCounter.run();
assertEquals(0, smallCounter.get());
assertEquals(0, smallDedicatedCounter.get());
+ assertEquals(0, smallRevocableCounter.get());
+ assertEquals(0, smallDedicatedRevocableCounter.get());
assertEquals(0, largeCounter.get());
assertEquals(0, largeDedicatedCounter.get());
+ assertEquals(0, largeRevocableCounter.get());
+ assertEquals(0, largeDedicatedRevocableCounter.get());
}
@Test
public void testTinyOffers() {
expectStatExport();
- expectGetSlots(
- new MachineResource(IResourceAggregate.build(new ResourceAggregate(0.1, 1, 1)), false));
+ expectGetSlots(new MachineResource(
+ IResourceAggregate.build(new ResourceAggregate(0.1, 1, 1)), false, false));
control.replay();
slotCounter.run();
assertEquals(0, smallCounter.get());
assertEquals(0, smallDedicatedCounter.get());
+ assertEquals(0, smallRevocableCounter.get());
+ assertEquals(0, smallDedicatedRevocableCounter.get());
assertEquals(0, largeCounter.get());
assertEquals(0, largeDedicatedCounter.get());
+ assertEquals(0, largeRevocableCounter.get());
+ assertEquals(0, largeDedicatedRevocableCounter.get());
}
@Test
@@ -107,36 +127,46 @@ public class SlotSizeCounterTest extends EasyMockTest {
expectStatExport();
expectGetSlots(
new MachineResource(
- IResourceAggregate.build(new ResourceAggregate(1000, 16384, 1)), false));
+ IResourceAggregate.build(new ResourceAggregate(1000, 16384, 1)), false, false));
control.replay();
slotCounter.run();
assertEquals(0, smallCounter.get());
assertEquals(0, smallDedicatedCounter.get());
+ assertEquals(0, smallRevocableCounter.get());
+ assertEquals(0, smallDedicatedRevocableCounter.get());
assertEquals(0, largeCounter.get());
assertEquals(0, largeDedicatedCounter.get());
+ assertEquals(0, largeRevocableCounter.get());
+ assertEquals(0, largeDedicatedRevocableCounter.get());
}
@Test
public void testCountSlots() {
expectStatExport();
expectGetSlots(
- new MachineResource(SMALL, false),
- new MachineResource(SMALL, false),
- new MachineResource(LARGE, false),
- new MachineResource(ResourceAggregates.scale(LARGE, 4), false),
- new MachineResource(IResourceAggregate.build(new ResourceAggregate(1, 1, 1)), false),
- new MachineResource(SMALL, true),
- new MachineResource(SMALL, true),
- new MachineResource(ResourceAggregates.scale(SMALL, 2), true));
+ new MachineResource(SMALL, false, false),
+ new MachineResource(SMALL, false, false),
+ new MachineResource(LARGE, false, false),
+ new MachineResource(LARGE, false, true),
+ new MachineResource(LARGE, true, true),
+ new MachineResource(ResourceAggregates.scale(LARGE, 4), false, false),
+ new MachineResource(IResourceAggregate.build(new ResourceAggregate(1, 1, 1)), false, false),
+ new MachineResource(SMALL, true, false),
+ new MachineResource(SMALL, true, false),
+ new MachineResource(ResourceAggregates.scale(SMALL, 2), true, false));
control.replay();
slotCounter.run();
assertEquals(22, smallCounter.get());
assertEquals(4, smallDedicatedCounter.get());
+ assertEquals(4, smallRevocableCounter.get());
+ assertEquals(4, smallDedicatedRevocableCounter.get());
assertEquals(5, largeCounter.get());
assertEquals(0, largeDedicatedCounter.get());
+ assertEquals(1, largeRevocableCounter.get());
+ assertEquals(1, largeDedicatedRevocableCounter.get());
}
}