You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 20:11:59 UTC
[22/50] [abbrv] incubator-ignite git commit: #IGNITE-857 Added comma
separate limit.
#IGNITE-857 Added comma separate limit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/07a10952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/07a10952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/07a10952
Branch: refs/heads/ignite-218
Commit: 07a10952093f8f4c7ce432413bb582c6ab96dc26
Parents: 8deb577
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu May 28 11:15:55 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu May 28 11:15:55 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/mesos/ClusterProperties.java | 18 +++++++
.../apache/ignite/mesos/IgniteScheduler.java | 34 +++++++-----
.../ignite/mesos/IgniteSchedulerSelfTest.java | 56 ++++++++++++++++++--
3 files changed, 91 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
index 9f0b304..944735e 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
@@ -215,6 +215,13 @@ public class ClusterProperties {
}
/**
+ * Set CPU count limit.
+ */
+ public void cpusPerNode(double cpu) {
+ this.cpuPerNode = cpu;
+ }
+
+ /**
* @return mem limit.
*/
public double memory() {
@@ -223,6 +230,8 @@ public class ClusterProperties {
/**
* Set mem limit.
+ *
+ * @param mem Memory.
*/
public void memory(double mem) {
this.mem = mem;
@@ -236,6 +245,15 @@ public class ClusterProperties {
}
/**
+ * Set mem limit.
+ *
+ * @param mem Memory.
+ */
+ public void memoryPerNode(double mem) {
+ this.memPerNode = mem;
+ }
+
+ /**
* @return disk limit.
*/
public double disk() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 17daf45..e833025 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.*;
*/
public class IgniteScheduler implements Scheduler {
/** Cpus. */
- public static final String CPUS = "cpus";
+ public static final String CPU = "cpus";
/** Mem. */
public static final String MEM = "mem";
@@ -126,14 +126,17 @@ public class IgniteScheduler implements Scheduler {
.addUris(Protos.CommandInfo.URI.newBuilder()
.setValue(cfgUrl));
- if (clusterProps.usersLibsUrl() != null)
- builder.addUris(Protos.CommandInfo.URI.newBuilder()
- .setValue(clusterProps.usersLibsUrl())
- .setExtract(true));
- else if (resourceProvider.resourceUrl() != null) {
- for (String url : resourceProvider.resourceUrl())
- builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
- }
+ // Collection user's libs.
+ Collection<String> usersLibs = new ArrayList<>();
+
+ if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty())
+ Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM));
+
+ if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty())
+ usersLibs.addAll(resourceProvider.resourceUrl());
+
+ for (String url : usersLibs)
+ builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url));
String cfgName = resourceProvider.configName();
@@ -155,7 +158,7 @@ public class IgniteScheduler implements Scheduler {
.setSlaveId(offer.getSlaveId())
.setCommand(builder)
.addResources(Protos.Resource.newBuilder()
- .setName(CPUS)
+ .setName(CPU)
.setType(Protos.Value.Type.SCALAR)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
.addResources(Protos.Resource.newBuilder()
@@ -210,7 +213,7 @@ public class IgniteScheduler implements Scheduler {
// Collect resource on slave.
for (Protos.Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals(CPUS)) {
+ if (resource.getName().equals(CPU)) {
if (resource.getType().equals(Protos.Value.Type.SCALAR))
cpus = resource.getScalar().getValue();
else
@@ -251,6 +254,13 @@ public class IgniteScheduler implements Scheduler {
mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode()));
disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode()));
+ if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus)
+ || (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) {
+ log.debug("Offer not sufficient for slave request: {}", offer.getResourcesList());
+
+ return null;
+ }
+
if (cpus > 0 && mem > 0)
return new IgniteTask(offer.getHostname(), cpus, mem, disk);
else {
@@ -284,7 +294,7 @@ public class IgniteScheduler implements Scheduler {
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
- .setName(CPUS)
+ .setName(CPU)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
.build();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 337b47c..13855b5 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -71,7 +71,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
- assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+ assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
}
@@ -95,7 +95,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
- assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+ assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
mock.clear();
@@ -130,7 +130,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
- assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS));
+ assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
mock.clear();
@@ -168,7 +168,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
- totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS);
+ totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU);
totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM);
mock.clear();
@@ -254,6 +254,52 @@ public class IgniteSchedulerSelfTest extends TestCase {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testPerNode() throws Exception {
+ Protos.Offer offer = createOffer("hostname", 8, 1024);
+
+ DriverMock mock = new DriverMock();
+
+ ClusterProperties clustProp = new ClusterProperties();
+ clustProp.memoryPerNode(1024);
+ clustProp.cpusPerNode(2);
+
+ scheduler.setClusterProps(clustProp);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNotNull(mock.launchedTask);
+
+ Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+ assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU));
+ assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM));
+
+ mock.clear();
+
+ offer = createOffer("hostname", 1, 2048);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ assertNotNull(mock.declinedOffer);
+ assertEquals(offer.getId(), mock.declinedOffer);
+
+ mock.clear();
+
+ offer = createOffer("hostname", 4, 512);
+
+ scheduler.resourceOffers(mock, Collections.singletonList(offer));
+
+ assertNull(mock.launchedTask);
+
+ assertNotNull(mock.declinedOffer);
+ assertEquals(offer.getId(), mock.declinedOffer);
+ }
+
+ /**
* @param resourceType Resource type.
* @return Value.
*/
@@ -280,7 +326,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
.setHostname(hostname)
.addResources(Protos.Resource.newBuilder()
.setType(Protos.Value.Type.SCALAR)
- .setName(IgniteScheduler.CPUS)
+ .setName(IgniteScheduler.CPU)
.setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
.build())
.addResources(Protos.Resource.newBuilder()