You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/28 23:02:06 UTC

[08/37] incubator-ignite git commit: #IGNITE-857 Added resource limit.

#IGNITE-857 Added resource limit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae8bcf83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae8bcf83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae8bcf83

Branch: refs/heads/ignite-gg-10369
Commit: ae8bcf83b5e14efeb21dbd541ef56c629c8e214d
Parents: e320873
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Wed May 20 11:50:39 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Wed May 20 11:50:39 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/mesos/ClusterResources.java   | 38 ++++++++-
 .../apache/ignite/mesos/IgniteFramework.java    |  4 +-
 .../apache/ignite/mesos/IgniteScheduler.java    | 87 +++++++++++++++-----
 .../ignite/mesos/IgniteSchedulerSelfTest.java   |  2 +-
 4 files changed, 103 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
index 0a2193f..1887530 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
@@ -52,6 +52,18 @@ public class ClusterResources {
     private double nodeCnt = DEFAULT_VALUE;
 
     /** */
+    public static final String IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE = "IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE";
+
+    /** Min memory per node. */
+    private int minCpu = 2;
+
+    /** */
+    public static final String IGNITE_RESOURCE_MIN_MEMORY_PER_NODE = "IGNITE_RESOURCE_MIN_MEMORY_PER_NODE";
+
+    /** Min memory per node. */
+    private int minMemoryCnt = 256;
+
+    /** */
     public ClusterResources() {
         // No-op.
     }
@@ -85,14 +97,32 @@ public class ClusterResources {
     }
 
     /**
+     * @return min memory per node.
+     */
+    public int minMemoryPerNode() {
+        return minMemoryCnt;
+    }
+
+    /**
+     * @return min cpu count per node.
+     */
+    public int minCpuPerNode() {
+        return minCpu;
+    }
+
+    /**
      * @param config path to config file.
      * @return Cluster configuration.
      */
     public static ClusterResources from(String config) {
         try {
-            Properties props = new Properties();
+            Properties props = null;
+
+            if (config != null) {
+                props = new Properties();
 
-            props.load(new FileInputStream(config));
+                props.load(new FileInputStream(config));
+            }
 
             ClusterResources resources = new ClusterResources();
 
@@ -114,13 +144,13 @@ public class ClusterResources {
      * @return Property value.
      */
     private static double getProperty(String name, Properties fileProps) {
-        if (fileProps.containsKey(name))
+        if (fileProps != null && fileProps.containsKey(name))
             return Double.valueOf(fileProps.getProperty(name));
 
         String property = System.getProperty(name);
 
         if (property == null)
-            System.getenv(name);
+            property = System.getenv(name);
 
         if (property == null)
             return DEFAULT_VALUE;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index 3d309f3..2d74f71 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -42,8 +42,8 @@ public class IgniteFramework {
             frameworkBuilder.setCheckpoint(true);
         }
 
-        // create the scheduler
-        final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1]));
+        // Create the scheduler.
+        final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(null));
 
         // create the driver
         MesosSchedulerDriver driver;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index fcbab87..9d10860 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -21,7 +21,6 @@ import org.apache.mesos.*;
 import org.slf4j.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -46,9 +45,6 @@ public class IgniteScheduler implements Scheduler {
     /** Default port range. */
     public static final String DEFAULT_PORT = ":47500..47510";
 
-    /** Min of memory required. */
-    public static final int MIN_MEMORY = 256;
-
     /** Delimiter to use in IP names. */
     public static final String DELIM = ",";
 
@@ -62,7 +58,7 @@ public class IgniteScheduler implements Scheduler {
     private AtomicInteger taskIdGenerator = new AtomicInteger();
 
     /** Task on host. */
-    private ConcurrentMap<String, IgniteTask> tasks = new ConcurrentHashMap<>();
+    private Map<String, IgniteTask> tasks = new HashMap<>();
 
     /** Cluster resources. */
     private ClusterResources clusterLimit;
@@ -82,7 +78,7 @@ public class IgniteScheduler implements Scheduler {
 
     /** {@inheritDoc} */
     @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
-        log.info("reregistered");
+        log.info("reregistered()");
     }
 
     /** {@inheritDoc} */
@@ -138,7 +134,7 @@ public class IgniteScheduler implements Scheduler {
         cont.setDocker(docker.build());
 
         return Protos.TaskInfo.newBuilder()
-            .setName("task " + taskId.getValue())
+            .setName("Ignite node " + taskId.getValue())
             .setTaskId(taskId)
             .setSlaveId(offer.getSlaveId())
             .addResources(Protos.Resource.newBuilder()
@@ -153,7 +149,7 @@ public class IgniteScheduler implements Scheduler {
             .setCommand(Protos.CommandInfo.newBuilder()
                 .setShell(false)
                 .addArguments(STARTUP_SCRIPT)
-                .addArguments(String.valueOf(igniteTask.mem()))
+                .addArguments(String.valueOf((int) igniteTask.mem()))
                 .addArguments(getAddress()))
             .build();
     }
@@ -180,13 +176,15 @@ public class IgniteScheduler implements Scheduler {
      * @return Ignite task description.
      */
     private IgniteTask checkOffer(Protos.Offer offer) {
-        if (checkLimit(clusterLimit.instances(), tasks.size()))
+        // Check that limit on running nodes.
+        if (!checkLimit(clusterLimit.instances(), tasks.size()))
             return null;
 
-        double cpus = -2;
-        double mem = -2;
-        double disk = -2;
+        double cpus = -1;
+        double mem = -1;
+        double disk = -1;
 
+        // Collect resource on slave.
         for (Protos.Resource resource : offer.getResourcesList()) {
             if (resource.getName().equals(CPUS)) {
                 if (resource.getType().equals(Protos.Value.Type.SCALAR))
@@ -200,17 +198,43 @@ public class IgniteScheduler implements Scheduler {
                 else
                     log.debug("Mem resource was not a scalar: " + resource.getType().toString());
             }
-            else if (resource.getType().equals(Protos.Value.Type.SCALAR))
-                disk = resource.getScalar().getValue();
-            else
-                log.debug("Disk resource was not a scalar: " + resource.getType().toString());
+            else if (resource.getName().equals(DISK))
+                if (resource.getType().equals(Protos.Value.Type.SCALAR))
+                    disk = resource.getScalar().getValue();
+                else
+                    log.debug("Disk resource was not a scalar: " + resource.getType().toString());
+        }
+
+        // Check that slave satisfies min requirements.
+        if (cpus < clusterLimit.minCpuPerNode()  && mem < clusterLimit.minMemoryPerNode() ) {
+            log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
+                "\n" + offer.getAttributesList().toString() +
+                "\nRequested for slave:\n" +
+                "  cpus:  " + cpus + "\n" +
+                "  mem:   " + mem);
+
+            return null;
         }
 
-        if (checkLimit(clusterLimit.memory(), mem) &&
-            checkLimit(clusterLimit.cpus(), cpus) &&
-            checkLimit(clusterLimit.disk(), disk) &&
-            MIN_MEMORY <= mem)
+        double totalCpus = 0;
+        double totalMem = 0;
+        double totalDisk = 0;
 
+        // Collect occupied resources.
+        for (IgniteTask task : tasks.values()) {
+            totalCpus += task.cpuCores();
+            totalMem += task.mem();
+            totalDisk += task.disk();
+        }
+
+        cpus = clusterLimit.cpus() == ClusterResources.DEFAULT_VALUE ? cpus :
+            Math.min(clusterLimit.cpus() - totalCpus, cpus);
+        mem = clusterLimit.memory() == ClusterResources.DEFAULT_VALUE ? mem :
+            Math.min(clusterLimit.memory() - totalMem, mem);
+        disk = clusterLimit.disk() == ClusterResources.DEFAULT_VALUE ? disk :
+            Math.min(clusterLimit.disk() - totalDisk, disk);
+
+        if (cpus > 0 && mem > 0)
             return new IgniteTask(offer.getHostname(), cpus, mem, disk);
         else {
             log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
@@ -246,7 +270,28 @@ public class IgniteScheduler implements Scheduler {
         switch (taskStatus.getState()) {
             case TASK_FAILED:
             case TASK_FINISHED:
-                tasks.remove(taskId);
+                synchronized (mux) {
+                    IgniteTask failedTask = tasks.remove(taskId);
+
+                    if (failedTask != null) {
+                        List<Protos.Request> requests = new ArrayList<>();
+
+                        Protos.Request request = Protos.Request.newBuilder()
+                            .addResources(Protos.Resource.newBuilder()
+                                .setType(Protos.Value.Type.SCALAR)
+                                .setName(MEM)
+                                .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
+                            .addResources(Protos.Resource.newBuilder()
+                                .setType(Protos.Value.Type.SCALAR)
+                                .setName(CPUS)
+                                .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
+                            .build();
+
+                        requests.add(request);
+
+                        schedulerDriver.requestResources(requests);
+                    }
+                }
                 break;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 2c4b6ee..8f8ca8b 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -33,7 +33,7 @@ public class IgniteSchedulerSelfTest extends TestCase {
     @Override public void setUp() throws Exception {
         super.setUp();
 
-        scheduler = new IgniteScheduler();
+        //scheduler = new IgniteScheduler();
     }
 
     /**