You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 09:06:29 UTC
[06/50] incubator-ignite git commit: #IGNITE-857 Added resource limit.
#IGNITE-857 Added resource limit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae8bcf83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae8bcf83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae8bcf83
Branch: refs/heads/ignite-471-2
Commit: ae8bcf83b5e14efeb21dbd541ef56c629c8e214d
Parents: e320873
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Wed May 20 11:50:39 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Wed May 20 11:50:39 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/mesos/ClusterResources.java | 38 ++++++++-
.../apache/ignite/mesos/IgniteFramework.java | 4 +-
.../apache/ignite/mesos/IgniteScheduler.java | 87 +++++++++++++++-----
.../ignite/mesos/IgniteSchedulerSelfTest.java | 2 +-
4 files changed, 103 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
index 0a2193f..1887530 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
@@ -52,6 +52,18 @@ public class ClusterResources {
private double nodeCnt = DEFAULT_VALUE;
/** */
+ public static final String IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE = "IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE";
+
+ /** Min memory per node. */
+ private int minCpu = 2;
+
+ /** */
+ public static final String IGNITE_RESOURCE_MIN_MEMORY_PER_NODE = "IGNITE_RESOURCE_MIN_MEMORY_PER_NODE";
+
+ /** Min memory per node. */
+ private int minMemoryCnt = 256;
+
+ /** */
public ClusterResources() {
// No-op.
}
@@ -85,14 +97,32 @@ public class ClusterResources {
}
/**
+ * @return min memory per node.
+ */
+ public int minMemoryPerNode() {
+ return minMemoryCnt;
+ }
+
+ /**
+ * @return min cpu count per node.
+ */
+ public int minCpuPerNode() {
+ return minCpu;
+ }
+
+ /**
* @param config path to config file.
* @return Cluster configuration.
*/
public static ClusterResources from(String config) {
try {
- Properties props = new Properties();
+ Properties props = null;
+
+ if (config != null) {
+ props = new Properties();
- props.load(new FileInputStream(config));
+ props.load(new FileInputStream(config));
+ }
ClusterResources resources = new ClusterResources();
@@ -114,13 +144,13 @@ public class ClusterResources {
* @return Property value.
*/
private static double getProperty(String name, Properties fileProps) {
- if (fileProps.containsKey(name))
+ if (fileProps != null && fileProps.containsKey(name))
return Double.valueOf(fileProps.getProperty(name));
String property = System.getProperty(name);
if (property == null)
- System.getenv(name);
+ property = System.getenv(name);
if (property == null)
return DEFAULT_VALUE;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index 3d309f3..2d74f71 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -42,8 +42,8 @@ public class IgniteFramework {
frameworkBuilder.setCheckpoint(true);
}
- // create the scheduler
- final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1]));
+ // Create the scheduler.
+ final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(null));
// create the driver
MesosSchedulerDriver driver;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index fcbab87..9d10860 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -21,7 +21,6 @@ import org.apache.mesos.*;
import org.slf4j.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
/**
@@ -46,9 +45,6 @@ public class IgniteScheduler implements Scheduler {
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
- /** Min of memory required. */
- public static final int MIN_MEMORY = 256;
-
/** Delimiter to use in IP names. */
public static final String DELIM = ",";
@@ -62,7 +58,7 @@ public class IgniteScheduler implements Scheduler {
private AtomicInteger taskIdGenerator = new AtomicInteger();
/** Task on host. */
- private ConcurrentMap<String, IgniteTask> tasks = new ConcurrentHashMap<>();
+ private Map<String, IgniteTask> tasks = new HashMap<>();
/** Cluster resources. */
private ClusterResources clusterLimit;
@@ -82,7 +78,7 @@ public class IgniteScheduler implements Scheduler {
/** {@inheritDoc} */
@Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
- log.info("reregistered");
+ log.info("reregistered()");
}
/** {@inheritDoc} */
@@ -138,7 +134,7 @@ public class IgniteScheduler implements Scheduler {
cont.setDocker(docker.build());
return Protos.TaskInfo.newBuilder()
- .setName("task " + taskId.getValue())
+ .setName("Ignite node " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(Protos.Resource.newBuilder()
@@ -153,7 +149,7 @@ public class IgniteScheduler implements Scheduler {
.setCommand(Protos.CommandInfo.newBuilder()
.setShell(false)
.addArguments(STARTUP_SCRIPT)
- .addArguments(String.valueOf(igniteTask.mem()))
+ .addArguments(String.valueOf((int) igniteTask.mem()))
.addArguments(getAddress()))
.build();
}
@@ -180,13 +176,15 @@ public class IgniteScheduler implements Scheduler {
* @return Ignite task description.
*/
private IgniteTask checkOffer(Protos.Offer offer) {
- if (checkLimit(clusterLimit.instances(), tasks.size()))
+ // Check that limit on running nodes.
+ if (!checkLimit(clusterLimit.instances(), tasks.size()))
return null;
- double cpus = -2;
- double mem = -2;
- double disk = -2;
+ double cpus = -1;
+ double mem = -1;
+ double disk = -1;
+ // Collect resource on slave.
for (Protos.Resource resource : offer.getResourcesList()) {
if (resource.getName().equals(CPUS)) {
if (resource.getType().equals(Protos.Value.Type.SCALAR))
@@ -200,17 +198,43 @@ public class IgniteScheduler implements Scheduler {
else
log.debug("Mem resource was not a scalar: " + resource.getType().toString());
}
- else if (resource.getType().equals(Protos.Value.Type.SCALAR))
- disk = resource.getScalar().getValue();
- else
- log.debug("Disk resource was not a scalar: " + resource.getType().toString());
+ else if (resource.getName().equals(DISK))
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ disk = resource.getScalar().getValue();
+ else
+ log.debug("Disk resource was not a scalar: " + resource.getType().toString());
+ }
+
+ // Check that slave satisfies min requirements.
+ if (cpus < clusterLimit.minCpuPerNode() && mem < clusterLimit.minMemoryPerNode() ) {
+ log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
+ "\n" + offer.getAttributesList().toString() +
+ "\nRequested for slave:\n" +
+ " cpus: " + cpus + "\n" +
+ " mem: " + mem);
+
+ return null;
}
- if (checkLimit(clusterLimit.memory(), mem) &&
- checkLimit(clusterLimit.cpus(), cpus) &&
- checkLimit(clusterLimit.disk(), disk) &&
- MIN_MEMORY <= mem)
+ double totalCpus = 0;
+ double totalMem = 0;
+ double totalDisk = 0;
+ // Collect occupied resources.
+ for (IgniteTask task : tasks.values()) {
+ totalCpus += task.cpuCores();
+ totalMem += task.mem();
+ totalDisk += task.disk();
+ }
+
+ cpus = clusterLimit.cpus() == ClusterResources.DEFAULT_VALUE ? cpus :
+ Math.min(clusterLimit.cpus() - totalCpus, cpus);
+ mem = clusterLimit.memory() == ClusterResources.DEFAULT_VALUE ? mem :
+ Math.min(clusterLimit.memory() - totalMem, mem);
+ disk = clusterLimit.disk() == ClusterResources.DEFAULT_VALUE ? disk :
+ Math.min(clusterLimit.disk() - totalDisk, disk);
+
+ if (cpus > 0 && mem > 0)
return new IgniteTask(offer.getHostname(), cpus, mem, disk);
else {
log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
@@ -246,7 +270,28 @@ public class IgniteScheduler implements Scheduler {
switch (taskStatus.getState()) {
case TASK_FAILED:
case TASK_FINISHED:
- tasks.remove(taskId);
+ synchronized (mux) {
+ IgniteTask failedTask = tasks.remove(taskId);
+
+ if (failedTask != null) {
+ List<Protos.Request> requests = new ArrayList<>();
+
+ Protos.Request request = Protos.Request.newBuilder()
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(MEM)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
+ .addResources(Protos.Resource.newBuilder()
+ .setType(Protos.Value.Type.SCALAR)
+ .setName(CPUS)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
+ .build();
+
+ requests.add(request);
+
+ schedulerDriver.requestResources(requests);
+ }
+ }
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 2c4b6ee..8f8ca8b 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -33,7 +33,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
@Override public void setUp() throws Exception {
super.setUp();
- scheduler = new IgniteScheduler();
+ //scheduler = new IgniteScheduler();
}
/**