You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/28 18:45:21 UTC

[23/35] incubator-ignite git commit: #IGNITE-857 Added comma separate limit.

#IGNITE-857 Added comma separate limit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/07a10952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/07a10952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/07a10952

Branch: refs/heads/ignite-218-hdfs-only
Commit: 07a10952093f8f4c7ce432413bb582c6ab96dc26
Parents: 8deb577
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu May 28 11:15:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu May 28 11:15:55 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/mesos/ClusterProperties.java  | 18 +++++++
 .../apache/ignite/mesos/IgniteScheduler.java    | 34 +++++++-----
 .../ignite/mesos/IgniteSchedulerSelfTest.java   | 56 ++++++++++++++++++--
 3 files changed, 91 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
index 9f0b304..944735e 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
@@ -215,6 +215,13 @@ public class ClusterProperties {
     }
 
     /**
+     * Set CPU count limit.
+     */
+    public void cpusPerNode(double cpu) {
+        this.cpuPerNode = cpu;
+    }
+
+    /**
      * @return mem limit.
      */
     public double memory() {
@@ -223,6 +230,8 @@ public class ClusterProperties {
 
     /**
      * Set mem limit.
+     *
+     * @param mem Memory.
      */
     public void memory(double mem) {
         this.mem = mem;
@@ -236,6 +245,15 @@ public class ClusterProperties {
     }
 
     /**
+     * Set mem limit.
+     *
+     * @param mem Memory.
+     */
+    public void memoryPerNode(double mem) {
+         this.memPerNode = mem;
+    }
+
+    /**
      * @return disk limit.
      */
     public double disk() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 17daf45..e833025 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.*;
  */
 public class IgniteScheduler implements Scheduler {
     /** Cpus. */
-    public static final String CPUS = "cpus";
+    public static final String CPU = "cpus";
 
     /** Mem. */
     public static final String MEM = "mem";
@@ -126,14 +126,17 @@ public class IgniteScheduler implements Scheduler {
             .addUris(Protos.CommandInfo.URI.newBuilder()
                 .setValue(cfgUrl));
 
-        if (clusterProps.usersLibsUrl() != null)
-            builder.addUris(Protos.CommandInfo.URI.newBuilder()
-                .setValue(clusterProps.usersLibsUrl())
-                .setExtract(true));
-        else if (resourceProvider.resourceUrl() != null) {
-            for (String url : resourceProvider.resourceUrl())
-                builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
-        }
+        // Collection user's libs.
+        Collection<String> usersLibs = new ArrayList<>();
+
+        if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty())
+            Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM));
+
+        if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty())
+            usersLibs.addAll(resourceProvider.resourceUrl());
+
+        for (String url : usersLibs)
+            builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
 
         String cfgName = resourceProvider.configName();
 
@@ -155,7 +158,7 @@ public class IgniteScheduler implements Scheduler {
             .setSlaveId(offer.getSlaveId())
             .setCommand(builder)
             .addResources(Protos.Resource.newBuilder()
-                .setName(CPUS)
+                .setName(CPU)
                 .setType(Protos.Value.Type.SCALAR)
                 .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
             .addResources(Protos.Resource.newBuilder()
@@ -210,7 +213,7 @@ public class IgniteScheduler implements Scheduler {
 
         // Collect resource on slave.
         for (Protos.Resource resource : offer.getResourcesList()) {
-            if (resource.getName().equals(CPUS)) {
+            if (resource.getName().equals(CPU)) {
                 if (resource.getType().equals(Protos.Value.Type.SCALAR))
                     cpus = resource.getScalar().getValue();
                 else
@@ -251,6 +254,13 @@ public class IgniteScheduler implements Scheduler {
         mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode()));
         disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode()));
 
+        if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus)
+            || (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) {
+            log.debug("Offer not sufficient for slave request: {}", offer.getResourcesList());
+
+            return null;
+        }
+
         if (cpus > 0 && mem > 0)
             return new IgniteTask(offer.getHostname(), cpus, mem, disk);
         else {
@@ -284,7 +294,7 @@ public class IgniteScheduler implements Scheduler {
                             .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
                         .addResources(Protos.Resource.newBuilder()
                             .setType(Protos.Value.Type.SCALAR)
-                            .setName(CPUS)
+                            .setName(CPU)
                             .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
                         .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 337b47c..13855b5 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -71,7 +71,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
         Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
 
-        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
         assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
     }
 
@@ -95,7 +95,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
         Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
 
-        assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
         assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
 
         mock.clear();
@@ -130,7 +130,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
         Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
 
-        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
         assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
 
         mock.clear();
@@ -168,7 +168,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
             Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
 
-            totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS);
+            totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU);
             totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM);
 
             mock.clear();
@@ -254,6 +254,52 @@ public class IgniteSchedulerSelfTest extends TestCase {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPerNode() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 8, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.memoryPerNode(1024);
+        clustProp.cpusPerNode(2);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+        assertNotNull(mock.launchedTask);
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
+        assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
+
+        mock.clear();
+
+        offer = createOffer("hostname", 1, 2048);
+
+        scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+        assertNull(mock.launchedTask);
+
+        assertNotNull(mock.declinedOffer);
+        assertEquals(offer.getId(), mock.declinedOffer);
+
+        mock.clear();
+
+        offer = createOffer("hostname", 4, 512);
+
+        scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+        assertNull(mock.launchedTask);
+
+        assertNotNull(mock.declinedOffer);
+        assertEquals(offer.getId(), mock.declinedOffer);
+    }
+
+    /**
      * @param resourceType Resource type.
      * @return Value.
      */
@@ -280,7 +326,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
             .setHostname(hostname)
             .addResources(Protos.Resource.newBuilder()
                 .setType(Protos.Value.Type.SCALAR)
-                .setName(IgniteScheduler.CPUS)
+                .setName(IgniteScheduler.CPU)
                 .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
                 .build())
             .addResources(Protos.Resource.newBuilder()