You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/14 22:15:00 UTC
aurora git commit: Resources: finalizing Resources.java refactoring.
Repository: aurora
Updated Branches:
refs/heads/master 3676ec25b -> ab1c9b2ec
Resources: finalizing Resources.java refactoring.
Bugs closed: AURORA-1415
Reviewed at https://reviews.apache.org/r/37366/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ab1c9b2e
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ab1c9b2e
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ab1c9b2e
Branch: refs/heads/master
Commit: ab1c9b2ec2630919b5b3d1bbc61efa7831e568d0
Parents: 3676ec2
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Aug 14 13:11:47 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Aug 14 13:11:47 2015 -0700
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/Resources.java | 177 ++++++++-----------
.../aurora/scheduler/state/TaskAssigner.java | 2 +-
.../aurora/scheduler/ResourceSlotTest.java | 27 +++
.../apache/aurora/scheduler/ResourcesTest.java | 83 +++++++--
4 files changed, 173 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index 7b1b54e..40df262 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
@@ -41,89 +41,89 @@ import static org.apache.aurora.scheduler.ResourceType.PORTS;
import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
/**
- * A container for multiple resource vectors.
- * TODO(wfarner): Collapse this in with ResourceAggregates AURORA-105.
+ * A container for multiple Mesos resource vectors.
*/
public final class Resources {
- private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
- new Function<Range, Set<Integer>>() {
- @Override
- public Set<Integer> apply(Range range) {
- return ContiguousSet.create(
- com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
- DiscreteDomain.integers());
- }
- };
- private final double numCpus;
- private final Amount<Long, Data> disk;
- private final Amount<Long, Data> ram;
- private final int numPorts;
+ /**
+ * CPU resource filter.
+ */
+ public static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName());
- private Resources(double numCpus, Amount<Long, Data> ram, Amount<Long, Data> disk, int numPorts) {
- this.numCpus = numCpus;
- this.ram = requireNonNull(ram);
- this.disk = requireNonNull(disk);
- this.numPorts = numPorts;
- }
+ private final Iterable<Resource> mesosResources;
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Resources)) {
- return false;
- }
+ private Resources(Iterable<Resource> mesosResources) {
+ this.mesosResources = ImmutableList.copyOf(mesosResources);
+ }
- Resources other = (Resources) o;
- return Objects.equals(numCpus, other.numCpus)
- && Objects.equals(ram, other.ram)
- && Objects.equals(disk, other.disk)
- && Objects.equals(numPorts, other.numPorts);
+ /**
+ * Extracts the resources available in a slave offer.
+ *
+ * @param offer Offer to get resources from.
+ * @return The resources available in the offer.
+ */
+ public static Resources from(Offer offer) {
+ return new Resources(requireNonNull(offer.getResourcesList()));
}
- @Override
- public int hashCode() {
- return Objects.hash(numCpus, ram, disk, numPorts);
+ /**
+ * Filters resources by the provided {@code predicate}.
+ *
+ * @param predicate Predicate filter.
+ * @return A new {@code Resources} object containing only filtered Mesos resources.
+ */
+ public Resources filter(Predicate<Resource> predicate) {
+ return new Resources(Iterables.filter(mesosResources, predicate));
}
- @Override
- public String toString() {
- return com.google.common.base.Objects.toStringHelper(this)
- .add("numCpus", numCpus)
- .add("ram", ram)
- .add("disk", disk)
- .add("numPorts", numPorts)
- .toString();
+ /**
+ * Gets generalized aggregated resource view.
+ *
+ * @return {@code ResourceSlot} instance.
+ */
+ public ResourceSlot slot() {
+ return new ResourceSlot(getScalarValue(CPUS.getName()),
+ Amount.of((long) getScalarValue(RAM_MB.getName()), Data.MB),
+ Amount.of((long) getScalarValue(DISK_MB.getName()), Data.MB),
+ getNumAvailablePorts());
}
/**
- * Extracts the resources available in a slave offer.
+ * Attempts to grab {@code numPorts} from this resource instance.
*
- * @param offer Offer to get resources from.
- * @return The resources available in the offer.
+ * @param numPorts The number of ports to grab.
+ * @return The set of ports grabbed.
+ * @throws InsufficientResourcesException if not enough ports were available.
*/
- public static Resources from(Offer offer) {
- requireNonNull(offer);
- return new Resources(
- getScalarValue(offer, CPUS.getName()),
- Amount.of((long) getScalarValue(offer, RAM_MB.getName()), Data.MB),
- Amount.of((long) getScalarValue(offer, DISK_MB.getName()), Data.MB),
- getNumAvailablePorts(offer.getResourcesList()));
+ public Set<Integer> getPorts(int numPorts)
+ throws InsufficientResourcesException {
+
+ if (numPorts == 0) {
+ return ImmutableSet.of();
+ }
+
+ List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat(
+ Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS))));
+
+ if (availablePorts.size() < numPorts) {
+ throw new InsufficientResourcesException(
+ String.format("Could not get %d ports from %s", numPorts, availablePorts));
+ }
+
+ Collections.shuffle(availablePorts);
+ return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
}
- private static int getNumAvailablePorts(List<Resource> resource) {
+ private int getNumAvailablePorts() {
int offeredPorts = 0;
- for (Range range : getPortRanges(resource)) {
+ for (Range range : getPortRanges()) {
offeredPorts += 1 + range.getEnd() - range.getBegin();
}
return offeredPorts;
}
- private static double getScalarValue(Offer offer, String key) {
- return getScalarValue(offer.getResourcesList(), key);
- }
-
- private static double getScalarValue(List<Resource> resources, String key) {
- Resource resource = getResource(resources, key);
+ private double getScalarValue(String key) {
+ Resource resource = getResource(key);
if (resource == null) {
return 0;
}
@@ -131,12 +131,12 @@ public final class Resources {
return resource.getScalar().getValue();
}
- private static Resource getResource(List<Resource> resource, String key) {
- return Iterables.find(resource, e -> e.getName().equals(key), null);
+ private Resource getResource(String key) {
+ return Iterables.find(mesosResources, e -> e.getName().equals(key), null);
}
- private static Iterable<Range> getPortRanges(List<Resource> resources) {
- Resource resource = getResource(resources, PORTS.getName());
+ private Iterable<Range> getPortRanges() {
+ Resource resource = getResource(PORTS.getName());
if (resource == null) {
return ImmutableList.of();
}
@@ -145,15 +145,6 @@ public final class Resources {
}
/**
- * Gets generalized aggregated resource view.
- *
- * @return {@code ResourceSlot} instance.
- */
- public ResourceSlot slot() {
- return new ResourceSlot(numCpus, ram, disk, numPorts);
- }
-
- /**
* Thrown when there are insufficient resources to satisfy a request.
*/
static class InsufficientResourcesException extends RuntimeException {
@@ -162,33 +153,13 @@ public final class Resources {
}
}
- /**
- * Attempts to grab {@code numPorts} from the given resource {@code offer}.
- *
- * @param offer The offer to grab ports from.
- * @param numPorts The number of ports to grab.
- * @return The set of ports grabbed.
- * @throws InsufficientResourcesException if not enough ports were available.
- */
- public static Set<Integer> getPorts(Offer offer, int numPorts)
- throws InsufficientResourcesException {
-
- requireNonNull(offer);
-
- if (numPorts == 0) {
- return ImmutableSet.of();
- }
-
- List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(
- Iterables.concat(
- Iterables.transform(getPortRanges(offer.getResourcesList()), RANGE_TO_MEMBERS))));
-
- if (availablePorts.size() < numPorts) {
- throw new InsufficientResourcesException(
- String.format("Could not get %d ports from %s", numPorts, offer));
- }
-
- Collections.shuffle(availablePorts);
- return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
- }
+ private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
+ new Function<Range, Set<Integer>>() {
+ @Override
+ public Set<Integer> apply(Range range) {
+ return ContiguousSet.create(
+ com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
+ DiscreteDomain.integers());
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index a7a4381..ca4b5b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -107,7 +107,7 @@ public interface TaskAssigner {
String taskId) {
String host = offer.getHostname();
- Set<Integer> selectedPorts = Resources.getPorts(offer, requestedPorts.size());
+ Set<Integer> selectedPorts = Resources.from(offer).getPorts(requestedPorts.size());
Preconditions.checkState(selectedPorts.size() == requestedPorts.size());
final Iterator<String> names = requestedPorts.iterator();
http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
index d537315..50e7fc9 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler;
import java.util.Set;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -24,12 +25,16 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.mesos.Protos;
import org.junit.Test;
+import static org.apache.aurora.scheduler.ResourceSlot.MIN_THERMOS_RESOURCES;
import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource;
import static org.apache.aurora.scheduler.ResourceSlot.makeMesosResource;
+import static org.apache.aurora.scheduler.ResourceSlot.maxElements;
+import static org.apache.aurora.scheduler.ResourceSlot.sum;
import static org.apache.aurora.scheduler.ResourceType.CPUS;
import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.ResourceType.PORTS;
@@ -81,6 +86,21 @@ public class ResourceSlotTest {
}
@Test
+ public void testSum() {
+ assertEquals(THREE, sum(ImmutableList.of(ONE, ONE, ONE)));
+ }
+
+ @Test
+ public void testWithOverhead() {
+ assertEquals(maxElements(TWO, MIN_THERMOS_RESOURCES), ONE.withOverhead(
+ ExecutorSettings.newBuilder()
+ .setExecutorOverhead(ONE)
+ .setExecutorPath("ignored")
+ .setThermosObserverRoot("ignored")
+ .build()));
+ }
+
+ @Test
public void testToResourceList() {
ResourceSlot resources = ResourceSlot.from(TASK);
Set<Integer> ports = ImmutableSet.of(80, 443);
@@ -134,6 +154,13 @@ public class ResourceSlotTest {
assertNotEquals(resources, null);
}
+ @Test
+ public void testOrder() {
+ assertEquals(
+ ImmutableList.of(ONE, TWO, THREE, THREE),
+ ResourceSlot.ORDER.sortedCopy(ImmutableList.of(THREE, ONE, TWO, THREE)));
+ }
+
private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) {
Protos.Resource resource = makeMesosRangeResource(PORTS, values);
assertEquals(Protos.Value.Type.RANGES, resource.getType());
http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
index 313cf68..a5878a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
@@ -15,18 +15,26 @@ package org.apache.aurora.scheduler;
import java.util.Set;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
import org.apache.aurora.scheduler.Resources.InsufficientResourcesException;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.Value.Range;
import org.apache.mesos.Protos.Value.Ranges;
-import org.apache.mesos.Protos.Value.Type;
import org.junit.Test;
+import static com.twitter.common.quantity.Data.MB;
+
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
+import static org.apache.mesos.Protos.Value.Type.RANGES;
+import static org.apache.mesos.Protos.Value.Type.SCALAR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -34,21 +42,21 @@ public class ResourcesTest {
@Test
public void testPortRangeExact() {
Resource portsResource = createPortRange(Pair.of(1, 5));
- Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5);
+ Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
assertEquals(5, ports.size());
}
@Test
public void testOnePortAvailable() {
Resource portsResource = createPortRange(Pair.of(3, 3));
- Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 1);
+ Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(1);
assertEquals(1, ports.size());
}
@Test
public void testPortRangeAbundance() {
Resource portsResource = createPortRange(Pair.of(1, 10));
- Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5);
+ Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
assertEquals(5, ports.size());
}
@@ -56,14 +64,14 @@ public class ResourcesTest {
public void testPortRangeExhaust() {
Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15));
- Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 7);
+ Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(7);
assertEquals(7, ports.size());
- ports = Resources.getPorts(createOffer(portsResource), 8);
+ ports = Resources.from(createOffer(portsResource)).getPorts(8);
assertEquals(8, ports.size());
try {
- Resources.getPorts(createOffer(portsResource), 9);
+ Resources.from(createOffer(portsResource)).getPorts(9);
fail("Ports should not have been sufficient");
} catch (InsufficientResourcesException e) {
// Expected.
@@ -73,13 +81,44 @@ public class ResourcesTest {
@Test
public void testGetNoPorts() {
Resource portsResource = createPortRange(Pair.of(1, 5));
- assertEquals(ImmutableSet.of(), Resources.getPorts(createOffer(portsResource), 0));
+ assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0));
}
@Test(expected = Resources.InsufficientResourcesException.class)
public void testPortRangeScarcity() {
Resource portsResource = createPortRange(Pair.of(1, 2));
- Resources.getPorts(createOffer(portsResource), 5);
+ Resources.from(createOffer(portsResource)).getPorts(5);
+ }
+
+ @Test
+ public void testGetSlot() {
+ ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
+ .add(createCpuResource(8.0))
+ .add(createMemResource(1024, RAM_MB))
+ .add(createMemResource(2048, DISK_MB))
+ .add(createPortRange(Pair.of(1, 10)))
+ .build();
+
+ ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10);
+ assertEquals(expected, Resources.from(createOffer(resources)).slot());
+ }
+
+ @Test
+ public void testMissingResourcesHandledGracefully() {
+ ImmutableList<Resource> resources = ImmutableList.<Resource>builder().build();
+ assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot());
+ }
+
+ @Test
+ public void testFilter() {
+ ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
+ .add(createCpuResource(8.0))
+ .add(createMemResource(1024, RAM_MB))
+ .build();
+
+ assertEquals(
+ Resources.from(createOffer(createCpuResource(8.0))).slot(),
+ Resources.from(createOffer(resources)).filter(Resources.CPU).slot());
}
private Resource createPortRange(Pair<Integer, Integer> range) {
@@ -99,17 +138,37 @@ public class ResourcesTest {
return Resource.newBuilder()
.setName(PORTS.getName())
- .setType(Type.RANGES)
+ .setType(RANGES)
.setRanges(ranges)
.build();
}
- private Protos.Offer createOffer(Resource resources) {
+ private static Resource createCpuResource(double cpus) {
+ return Resource.newBuilder()
+ .setName(CPUS.getName())
+ .setType(SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus))
+ .build();
+ }
+
+ private static Resource createMemResource(long mem, ResourceType resourceType) {
+ return Resource.newBuilder()
+ .setName(resourceType.getName())
+ .setType(SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem))
+ .build();
+ }
+
+ private static Protos.Offer createOffer(Resource resource) {
+ return createOffer(ImmutableList.of(resource));
+ }
+
+ private static Protos.Offer createOffer(Iterable<Resource> resources) {
return Protos.Offer.newBuilder()
.setId(Protos.OfferID.newBuilder().setValue("offer-id"))
.setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
.setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
.setHostname("hostname")
- .addResources(resources).build();
+ .addAllResources(resources).build();
}
}