You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/08/14 22:15:00 UTC

aurora git commit: Resources: finalizing Resources.java refactoring.

Repository: aurora
Updated Branches:
  refs/heads/master 3676ec25b -> ab1c9b2ec


Resources: finalizing Resources.java refactoring.

Bugs closed: AURORA-1415

Reviewed at https://reviews.apache.org/r/37366/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ab1c9b2e
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ab1c9b2e
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ab1c9b2e

Branch: refs/heads/master
Commit: ab1c9b2ec2630919b5b3d1bbc61efa7831e568d0
Parents: 3676ec2
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Aug 14 13:11:47 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Aug 14 13:11:47 2015 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/Resources.java  | 177 ++++++++-----------
 .../aurora/scheduler/state/TaskAssigner.java    |   2 +-
 .../aurora/scheduler/ResourceSlotTest.java      |  27 +++
 .../apache/aurora/scheduler/ResourcesTest.java  |  83 +++++++--
 4 files changed, 173 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/Resources.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java
index 7b1b54e..40df262 100644
--- a/src/main/java/org/apache/aurora/scheduler/Resources.java
+++ b/src/main/java/org/apache/aurora/scheduler/Resources.java
@@ -15,10 +15,10 @@ package org.apache.aurora.scheduler;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ContiguousSet;
 import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
@@ -41,89 +41,89 @@ import static org.apache.aurora.scheduler.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
 
 /**
- * A container for multiple resource vectors.
- * TODO(wfarner): Collapse this in with ResourceAggregates AURORA-105.
+ * A container for multiple Mesos resource vectors.
  */
 public final class Resources {
-  private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
-      new Function<Range, Set<Integer>>() {
-        @Override
-        public Set<Integer> apply(Range range) {
-          return ContiguousSet.create(
-              com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
-              DiscreteDomain.integers());
-        }
-      };
 
-  private final double numCpus;
-  private final Amount<Long, Data> disk;
-  private final Amount<Long, Data> ram;
-  private final int numPorts;
+  /**
+   * CPU resource filter.
+   */
+  public static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName());
 
-  private Resources(double numCpus, Amount<Long, Data> ram, Amount<Long, Data> disk, int numPorts) {
-    this.numCpus = numCpus;
-    this.ram = requireNonNull(ram);
-    this.disk = requireNonNull(disk);
-    this.numPorts = numPorts;
-  }
+  private final Iterable<Resource> mesosResources;
 
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof Resources)) {
-      return false;
-    }
+  private Resources(Iterable<Resource> mesosResources) {
+    this.mesosResources = ImmutableList.copyOf(mesosResources);
+  }
 
-    Resources other = (Resources) o;
-    return Objects.equals(numCpus, other.numCpus)
-        && Objects.equals(ram, other.ram)
-        && Objects.equals(disk, other.disk)
-        && Objects.equals(numPorts, other.numPorts);
+  /**
+   * Extracts the resources available in a slave offer.
+   *
+   * @param offer Offer to get resources from.
+   * @return The resources available in the offer.
+   */
+  public static Resources from(Offer offer) {
+    return new Resources(requireNonNull(offer.getResourcesList()));
   }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(numCpus, ram, disk, numPorts);
+  /**
+   * Filters resources by the provided {@code predicate}.
+   *
+   * @param predicate Predicate filter.
+   * @return A new {@code Resources} object containing only filtered Mesos resources.
+   */
+  public Resources filter(Predicate<Resource> predicate) {
+    return new Resources(Iterables.filter(mesosResources, predicate));
   }
 
-  @Override
-  public String toString() {
-    return com.google.common.base.Objects.toStringHelper(this)
-        .add("numCpus", numCpus)
-        .add("ram", ram)
-        .add("disk", disk)
-        .add("numPorts", numPorts)
-        .toString();
+  /**
+   * Gets generalized aggregated resource view.
+   *
+   * @return {@code ResourceSlot} instance.
+   */
+  public ResourceSlot slot() {
+    return new ResourceSlot(getScalarValue(CPUS.getName()),
+        Amount.of((long) getScalarValue(RAM_MB.getName()), Data.MB),
+        Amount.of((long) getScalarValue(DISK_MB.getName()), Data.MB),
+        getNumAvailablePorts());
   }
 
   /**
-   * Extracts the resources available in a slave offer.
+   * Attempts to grab {@code numPorts} from this resource instance.
    *
-   * @param offer Offer to get resources from.
-   * @return The resources available in the offer.
+   * @param numPorts The number of ports to grab.
+   * @return The set of ports grabbed.
+   * @throws InsufficientResourcesException if not enough ports were available.
    */
-  public static Resources from(Offer offer) {
-    requireNonNull(offer);
-    return new Resources(
-        getScalarValue(offer, CPUS.getName()),
-        Amount.of((long) getScalarValue(offer, RAM_MB.getName()), Data.MB),
-        Amount.of((long) getScalarValue(offer, DISK_MB.getName()), Data.MB),
-        getNumAvailablePorts(offer.getResourcesList()));
+  public Set<Integer> getPorts(int numPorts)
+      throws InsufficientResourcesException {
+
+    if (numPorts == 0) {
+      return ImmutableSet.of();
+    }
+
+    List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat(
+        Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS))));
+
+    if (availablePorts.size() < numPorts) {
+      throw new InsufficientResourcesException(
+          String.format("Could not get %d ports from %s", numPorts, availablePorts));
+    }
+
+    Collections.shuffle(availablePorts);
+    return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
   }
 
-  private static int getNumAvailablePorts(List<Resource> resource) {
+  private int getNumAvailablePorts() {
     int offeredPorts = 0;
-    for (Range range : getPortRanges(resource)) {
+    for (Range range : getPortRanges()) {
       offeredPorts += 1 + range.getEnd() - range.getBegin();
     }
     return offeredPorts;
   }
 
-  private static double getScalarValue(Offer offer, String key) {
-    return getScalarValue(offer.getResourcesList(), key);
-  }
-
-  private static double getScalarValue(List<Resource> resources, String key) {
-    Resource resource = getResource(resources, key);
+  private double getScalarValue(String key) {
+    Resource resource = getResource(key);
     if (resource == null) {
       return 0;
     }
@@ -131,12 +131,12 @@ public final class Resources {
     return resource.getScalar().getValue();
   }
 
-  private static Resource getResource(List<Resource> resource, String key) {
-    return Iterables.find(resource, e -> e.getName().equals(key), null);
+  private Resource getResource(String key) {
+    return Iterables.find(mesosResources, e -> e.getName().equals(key), null);
   }
 
-  private static Iterable<Range> getPortRanges(List<Resource> resources) {
-    Resource resource = getResource(resources, PORTS.getName());
+  private Iterable<Range> getPortRanges() {
+    Resource resource = getResource(PORTS.getName());
     if (resource == null) {
       return ImmutableList.of();
     }
@@ -145,15 +145,6 @@ public final class Resources {
   }
 
   /**
-   * Gets generalized aggregated resource view.
-   *
-   * @return {@code ResourceSlot} instance.
-   */
-  public ResourceSlot slot() {
-    return new ResourceSlot(numCpus, ram, disk, numPorts);
-  }
-
-  /**
    * Thrown when there are insufficient resources to satisfy a request.
    */
   static class InsufficientResourcesException extends RuntimeException {
@@ -162,33 +153,13 @@ public final class Resources {
     }
   }
 
-  /**
-   * Attempts to grab {@code numPorts} from the given resource {@code offer}.
-   *
-   * @param offer The offer to grab ports from.
-   * @param numPorts The number of ports to grab.
-   * @return The set of ports grabbed.
-   * @throws InsufficientResourcesException if not enough ports were available.
-   */
-  public static Set<Integer> getPorts(Offer offer, int numPorts)
-      throws InsufficientResourcesException {
-
-    requireNonNull(offer);
-
-    if (numPorts == 0) {
-      return ImmutableSet.of();
-    }
-
-    List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(
-        Iterables.concat(
-            Iterables.transform(getPortRanges(offer.getResourcesList()), RANGE_TO_MEMBERS))));
-
-    if (availablePorts.size() < numPorts) {
-      throw new InsufficientResourcesException(
-          String.format("Could not get %d ports from %s", numPorts, offer));
-    }
-
-    Collections.shuffle(availablePorts);
-    return ImmutableSet.copyOf(availablePorts.subList(0, numPorts));
-  }
+  private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS =
+      new Function<Range, Set<Integer>>() {
+        @Override
+        public Set<Integer> apply(Range range) {
+          return ContiguousSet.create(
+              com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()),
+              DiscreteDomain.integers());
+        }
+      };
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index a7a4381..ca4b5b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -107,7 +107,7 @@ public interface TaskAssigner {
         String taskId) {
 
       String host = offer.getHostname();
-      Set<Integer> selectedPorts = Resources.getPorts(offer, requestedPorts.size());
+      Set<Integer> selectedPorts = Resources.from(offer).getPorts(requestedPorts.size());
       Preconditions.checkState(selectedPorts.size() == requestedPorts.size());
 
       final Iterator<String> names = requestedPorts.iterator();

http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
index d537315..50e7fc9 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java
@@ -16,6 +16,7 @@ package org.apache.aurora.scheduler;
 import java.util.Set;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -24,12 +25,16 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.ResourceSlot.MIN_THERMOS_RESOURCES;
 import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource;
 import static org.apache.aurora.scheduler.ResourceSlot.makeMesosResource;
+import static org.apache.aurora.scheduler.ResourceSlot.maxElements;
+import static org.apache.aurora.scheduler.ResourceSlot.sum;
 import static org.apache.aurora.scheduler.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.ResourceType.PORTS;
@@ -81,6 +86,21 @@ public class ResourceSlotTest {
   }
 
   @Test
+  public void testSum() {
+    assertEquals(THREE, sum(ImmutableList.of(ONE, ONE, ONE)));
+  }
+
+  @Test
+  public void testWithOverhead() {
+    assertEquals(maxElements(TWO, MIN_THERMOS_RESOURCES), ONE.withOverhead(
+        ExecutorSettings.newBuilder()
+            .setExecutorOverhead(ONE)
+            .setExecutorPath("ignored")
+            .setThermosObserverRoot("ignored")
+            .build()));
+  }
+
+  @Test
   public void testToResourceList() {
     ResourceSlot resources = ResourceSlot.from(TASK);
     Set<Integer> ports = ImmutableSet.of(80, 443);
@@ -134,6 +154,13 @@ public class ResourceSlotTest {
     assertNotEquals(resources, null);
   }
 
+  @Test
+  public void testOrder() {
+    assertEquals(
+        ImmutableList.of(ONE, TWO, THREE, THREE),
+        ResourceSlot.ORDER.sortedCopy(ImmutableList.of(THREE, ONE, TWO, THREE)));
+  }
+
   private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) {
     Protos.Resource resource = makeMesosRangeResource(PORTS, values);
     assertEquals(Protos.Value.Type.RANGES, resource.getType());

http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
index 313cf68..a5878a4 100644
--- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java
@@ -15,18 +15,26 @@ package org.apache.aurora.scheduler;
 
 import java.util.Set;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
 
 import org.apache.aurora.scheduler.Resources.InsufficientResourcesException;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.Value.Range;
 import org.apache.mesos.Protos.Value.Ranges;
-import org.apache.mesos.Protos.Value.Type;
 import org.junit.Test;
 
+import static com.twitter.common.quantity.Data.MB;
+
+import static org.apache.aurora.scheduler.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.ResourceType.RAM_MB;
+import static org.apache.mesos.Protos.Value.Type.RANGES;
+import static org.apache.mesos.Protos.Value.Type.SCALAR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -34,21 +42,21 @@ public class ResourcesTest {
   @Test
   public void testPortRangeExact() {
     Resource portsResource = createPortRange(Pair.of(1, 5));
-    Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5);
+    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
     assertEquals(5, ports.size());
   }
 
   @Test
   public void testOnePortAvailable() {
     Resource portsResource = createPortRange(Pair.of(3, 3));
-    Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 1);
+    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(1);
     assertEquals(1, ports.size());
   }
 
   @Test
   public void testPortRangeAbundance() {
     Resource portsResource = createPortRange(Pair.of(1, 10));
-    Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5);
+    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5);
     assertEquals(5, ports.size());
   }
 
@@ -56,14 +64,14 @@ public class ResourcesTest {
   public void testPortRangeExhaust() {
     Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15));
 
-    Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 7);
+    Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(7);
     assertEquals(7, ports.size());
 
-    ports = Resources.getPorts(createOffer(portsResource), 8);
+    ports = Resources.from(createOffer(portsResource)).getPorts(8);
     assertEquals(8, ports.size());
 
     try {
-      Resources.getPorts(createOffer(portsResource), 9);
+      Resources.from(createOffer(portsResource)).getPorts(9);
       fail("Ports should not have been sufficient");
     } catch (InsufficientResourcesException e) {
       // Expected.
@@ -73,13 +81,44 @@ public class ResourcesTest {
   @Test
   public void testGetNoPorts() {
     Resource portsResource = createPortRange(Pair.of(1, 5));
-    assertEquals(ImmutableSet.of(), Resources.getPorts(createOffer(portsResource), 0));
+    assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0));
   }
 
   @Test(expected = Resources.InsufficientResourcesException.class)
   public void testPortRangeScarcity() {
     Resource portsResource = createPortRange(Pair.of(1, 2));
-    Resources.getPorts(createOffer(portsResource), 5);
+    Resources.from(createOffer(portsResource)).getPorts(5);
+  }
+
+  @Test
+  public void testGetSlot() {
+    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
+        .add(createCpuResource(8.0))
+        .add(createMemResource(1024, RAM_MB))
+        .add(createMemResource(2048, DISK_MB))
+        .add(createPortRange(Pair.of(1, 10)))
+        .build();
+
+    ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10);
+    assertEquals(expected, Resources.from(createOffer(resources)).slot());
+  }
+
+  @Test
+  public void testMissingResourcesHandledGracefully() {
+    ImmutableList<Resource> resources = ImmutableList.<Resource>builder().build();
+    assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot());
+  }
+
+  @Test
+  public void testFilter() {
+    ImmutableList<Resource> resources = ImmutableList.<Resource>builder()
+        .add(createCpuResource(8.0))
+        .add(createMemResource(1024, RAM_MB))
+        .build();
+
+    assertEquals(
+        Resources.from(createOffer(createCpuResource(8.0))).slot(),
+        Resources.from(createOffer(resources)).filter(Resources.CPU).slot());
   }
 
   private Resource createPortRange(Pair<Integer, Integer> range) {
@@ -99,17 +138,37 @@ public class ResourcesTest {
 
     return Resource.newBuilder()
         .setName(PORTS.getName())
-        .setType(Type.RANGES)
+        .setType(RANGES)
         .setRanges(ranges)
         .build();
   }
 
-  private Protos.Offer createOffer(Resource resources) {
+  private static Resource createCpuResource(double cpus) {
+    return Resource.newBuilder()
+        .setName(CPUS.getName())
+        .setType(SCALAR)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus))
+        .build();
+  }
+
+  private static Resource createMemResource(long mem, ResourceType resourceType) {
+    return Resource.newBuilder()
+        .setName(resourceType.getName())
+        .setType(SCALAR)
+        .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem))
+        .build();
+  }
+
+  private static Protos.Offer createOffer(Resource resource) {
+    return createOffer(ImmutableList.of(resource));
+  }
+
+  private static Protos.Offer createOffer(Iterable<Resource> resources) {
     return Protos.Offer.newBuilder()
         .setId(Protos.OfferID.newBuilder().setValue("offer-id"))
         .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id"))
         .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id"))
         .setHostname("hostname")
-        .addResources(resources).build();
+        .addAllResources(resources).build();
   }
 }