You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/19 18:42:44 UTC

incubator-ignite git commit: #IGNITE-857 Added resource limit.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-857 55c166a64 -> e32087382


#IGNITE-857 Added resource limit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3208738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3208738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3208738

Branch: refs/heads/ignite-857
Commit: e320873828284fb86fb4d6e52cee98a6bb87b4af
Parents: 55c166a
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue May 19 19:42:22 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue May 19 19:42:22 2015 +0300

----------------------------------------------------------------------
 modules/mesos/pom.xml                           |   6 +-
 .../apache/ignite/mesos/ClusterResources.java   | 130 +++++++++++++++++++
 .../apache/ignite/mesos/IgniteFramework.java    |   4 +-
 .../apache/ignite/mesos/IgniteScheduler.java    | 120 ++++++++---------
 .../org/apache/ignite/mesos/IgniteTask.java     |  78 +++++++++++
 .../ignite/mesos/IgniteSchedulerSelfTest.java   |   1 -
 6 files changed, 272 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml
index ef73c0b..5ce3e5c 100644
--- a/modules/mesos/pom.xml
+++ b/modules/mesos/pom.xml
@@ -68,6 +68,11 @@
                     <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                     </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.ignite.mesos.IgniteFramework</mainClass>
+                        </manifest>
+                    </archive>
                 </configuration>
                 <executions>
                     <execution>
@@ -79,7 +84,6 @@
                     </execution>
                 </executions>
             </plugin>
-
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
new file mode 100644
index 0000000..0a2193f
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Cluster settings.
+ */
+public class ClusterResources {
+    /** Unlimited. */
+    public static final int DEFAULT_VALUE = -1;
+
+    /** */
+    public static final String IGNITE_RESOURCE_CPU_CORES = "IGNITE_RESOURCE_CPU_CORES";
+
+    /** CPU limit. */
+    private double cpu = DEFAULT_VALUE;
+
+    /** */
+    public static final String IGNITE_RESOURCE_MEM_MB = "IGNITE_RESOURCE_MEM_MB";
+
+    /** Memory limit. */
+    private double mem = DEFAULT_VALUE;
+
+    /** */
+    public static final String IGNITE_RESOURCE_DISK_MB = "IGNITE_RESOURCE_DISK_MB";
+
+    /** Disk space limit. */
+    private double disk = DEFAULT_VALUE;
+
+    /** */
+    public static final String IGNITE_RESOURCE_NODE_CNT = "IGNITE_RESOURCE_NODE_CNT";
+
+    /** Node count limit. */
+    private double nodeCnt = DEFAULT_VALUE;
+
+    /** */
+    public ClusterResources() {
+        // No-op.
+    }
+
+    /**
+     * @return CPU count limit.
+     */
+    public double cpus(){
+        return cpu;
+    }
+
+    /**
+     * @return mem limit.
+     */
+    public double memory() {
+        return mem;
+    }
+
+    /**
+     * @return disk limit.
+     */
+    public double disk() {
+        return disk;
+    }
+
+    /**
+     * @return instance count limit.
+     */
+    public double instances() {
+        return nodeCnt;
+    }
+
+    /**
+     * @param config path to config file.
+     * @return Cluster configuration.
+     */
+    public static ClusterResources from(String config) {
+        try {
+            Properties props = new Properties();
+
+            props.load(new FileInputStream(config));
+
+            ClusterResources resources = new ClusterResources();
+
+            resources.cpu = getProperty(IGNITE_RESOURCE_CPU_CORES, props);
+            resources.mem = getProperty(IGNITE_RESOURCE_MEM_MB, props);
+            resources.disk = getProperty(IGNITE_RESOURCE_DISK_MB, props);
+            resources.nodeCnt = getProperty(IGNITE_RESOURCE_NODE_CNT, props);
+
+            return resources;
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @param name Property name.
+     * @param fileProps Property file.
+     * @return Property value.
+     */
+    private static double getProperty(String name, Properties fileProps) {
+        if (fileProps.containsKey(name))
+            return Double.valueOf(fileProps.getProperty(name));
+
+        String property = System.getProperty(name);
+
+        if (property == null)
+            System.getenv(name);
+
+        if (property == null)
+            return DEFAULT_VALUE;
+
+        return Double.valueOf(property);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index 5c556a1..3d309f3 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -25,7 +25,7 @@ import org.apache.mesos.*;
  */
 public class IgniteFramework {
     /**
-     * @param args Args
+     * @param args Args [host:port] [resource limit]
      */
     public static void main(String[] args) {
         checkArgs(args);
@@ -43,7 +43,7 @@ public class IgniteFramework {
         }
 
         // create the scheduler
-        final Scheduler scheduler = new IgniteScheduler();
+        final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1]));
 
         // create the driver
         MesosSchedulerDriver driver;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 7b5623b..fcbab87 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -40,26 +40,39 @@ public class IgniteScheduler implements Scheduler {
     /** Mem. */
     public static final String MEM = "mem";
 
+    /** Disk. */
+    public static final String DISK = "disk";
+
     /** Default port range. */
     public static final String DEFAULT_PORT = ":47500..47510";
 
+    /** Min of memory required. */
+    public static final int MIN_MEMORY = 256;
+
     /** Delimiter to use in IP names. */
     public static final String DELIM = ",";
 
-    /** ID generator. */
-    private AtomicInteger taskIdGenerator = new AtomicInteger();
-
     /** Logger. */
     private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class);
 
-    /** Min of memory required. */
-    public static final int MIN_MEMORY = 256;
-
     /** Mutex. */
     private static final Object mux = new Object();
 
+    /** ID generator. */
+    private AtomicInteger taskIdGenerator = new AtomicInteger();
+
     /** Task on host. */
-    private ConcurrentMap<String, String> tasks = new ConcurrentHashMap<>();
+    private ConcurrentMap<String, IgniteTask> tasks = new ConcurrentHashMap<>();
+
+    /** Cluster resources. */
+    private ClusterResources clusterLimit;
+
+    /**
+     * @param clusterLimit Resources limit.
+     */
+    public IgniteScheduler(ClusterResources clusterLimit) {
+        this.clusterLimit = clusterLimit;
+    }
 
     /** {@inheritDoc} */
     @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
@@ -78,10 +91,10 @@ public class IgniteScheduler implements Scheduler {
             log.info("resourceOffers() with {} offers", offers.size());
 
             for (Protos.Offer offer : offers) {
-                Tuple<Double, Double> cpuMem = checkOffer(offer);
+                IgniteTask igniteTask = checkOffer(offer);
 
                 // Decline offer which doesn't match by mem or cpu.
-                if (cpuMem == null) {
+                if (igniteTask == null) {
                     schedulerDriver.declineOffer(offer.getId());
 
                     continue;
@@ -94,13 +107,13 @@ public class IgniteScheduler implements Scheduler {
                 log.info("Launching task {}", taskId.getValue());
 
                 // Create task to run.
-                Protos.TaskInfo task = createTask(offer, cpuMem, taskId);
+                Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
 
                 schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
                     Collections.singletonList(task),
                     Protos.Filters.newBuilder().setRefuseSeconds(1).build());
 
-                tasks.put(taskId.getValue(), offer.getHostname());
+                tasks.put(taskId.getValue(), igniteTask);
             }
         }
     }
@@ -109,11 +122,11 @@ public class IgniteScheduler implements Scheduler {
      * Create Task.
      *
      * @param offer Offer.
-     * @param cpuMem Cpu and mem on slave.
+     * @param igniteTask Task description.
      * @param taskId Task id.
      * @return Task.
      */
-    protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) {
+    protected Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) {
         // Docker image info.
         Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder()
             .setImage(IMAGE)
@@ -131,16 +144,16 @@ public class IgniteScheduler implements Scheduler {
             .addResources(Protos.Resource.newBuilder()
                 .setName(CPUS)
                 .setType(Protos.Value.Type.SCALAR)
-                .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1())))
+                .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
             .addResources(Protos.Resource.newBuilder()
                 .setName(MEM)
                 .setType(Protos.Value.Type.SCALAR)
-                .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
+                .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
             .setContainer(cont)
             .setCommand(Protos.CommandInfo.newBuilder()
                 .setShell(false)
                 .addArguments(STARTUP_SCRIPT)
-                .addArguments(String.valueOf(cpuMem.get2().intValue()))
+                .addArguments(String.valueOf(igniteTask.mem()))
                 .addArguments(getAddress()))
             .build();
     }
@@ -154,8 +167,8 @@ public class IgniteScheduler implements Scheduler {
 
         StringBuilder sb = new StringBuilder();
 
-        for (String host : tasks.values())
-            sb.append(host).append(DEFAULT_PORT).append(DELIM);
+        for (IgniteTask task : tasks.values())
+            sb.append(task.host()).append(DEFAULT_PORT).append(DELIM);
 
         return sb.substring(0, sb.length() - 1);
     }
@@ -164,11 +177,15 @@ public class IgniteScheduler implements Scheduler {
      * Check slave resources and return resources infos.
      *
      * @param offer Offer request.
-     * @return Pair where first is cpus, second is memory.
+     * @return Ignite task description.
      */
-    private Tuple<Double, Double> checkOffer(Protos.Offer offer) {
-        double cpus = -1;
-        double mem = -1;
+    private IgniteTask checkOffer(Protos.Offer offer) {
+        if (checkLimit(clusterLimit.instances(), tasks.size()))
+            return null;
+
+        double cpus = -2;
+        double mem = -2;
+        double disk = -2;
 
         for (Protos.Resource resource : offer.getResourcesList()) {
             if (resource.getName().equals(CPUS)) {
@@ -183,17 +200,18 @@ public class IgniteScheduler implements Scheduler {
                 else
                     log.debug("Mem resource was not a scalar: " + resource.getType().toString());
             }
-            else if (resource.getName().equals("disk"))
-                log.debug("Ignoring disk resources from offer");
+            else if (resource.getType().equals(Protos.Value.Type.SCALAR))
+                disk = resource.getScalar().getValue();
+            else
+                log.debug("Disk resource was not a scalar: " + resource.getType().toString());
         }
 
-        if (cpus < 0)
-            log.debug("No cpus resource present");
-        if (mem < 0)
-            log.debug("No mem resource present");
+        if (checkLimit(clusterLimit.memory(), mem) &&
+            checkLimit(clusterLimit.cpus(), cpus) &&
+            checkLimit(clusterLimit.disk(), disk) &&
+            MIN_MEMORY <= mem)
 
-        if (cpus >= 1 && MIN_MEMORY <= mem)
-            return new Tuple<>(cpus, mem);
+            return new IgniteTask(offer.getHostname(), cpus, mem, disk);
         else {
             log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
                 "\n" + offer.getAttributesList().toString() +
@@ -205,6 +223,15 @@ public class IgniteScheduler implements Scheduler {
         }
     }
 
+    /**
+     * @param limit Limit.
+     * @param value Value.
+     * @return {@code True} if limit isn't violated else {@code false}.
+     */
+    private boolean checkLimit(double limit, double value) {
+        return limit == ClusterResources.DEFAULT_VALUE || limit <= value;
+    }
+
     /** {@inheritDoc} */
     @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
         log.info("offerRescinded()");
@@ -250,37 +277,4 @@ public class IgniteScheduler implements Scheduler {
     @Override public void error(SchedulerDriver schedulerDriver, String s) {
         log.error("error() {}", s);
     }
-
-    /**
-     * Tuple.
-     */
-    public static class Tuple<A, B> {
-        /** */
-        private final A val1;
-
-        /** */
-        private final B val2;
-
-        /**
-         *
-         */
-        public Tuple(A val1, B val2) {
-            this.val1 = val1;
-            this.val2 = val2;
-        }
-
-        /**
-         * @return val1
-         */
-        public A get1() {
-            return val1;
-        }
-
-        /**
-         * @return val2
-         */
-        public B get2() {
-            return val2;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
new file mode 100644
index 0000000..bad9996
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+/**
+ * TODO
+ */
+public class IgniteTask {
+    /** */
+    public final String host;
+
+    /** */
+    public final double cpuCores;
+
+    /** */
+    public final double mem;
+
+    /** */
+    public final double disk;
+
+    /**
+     * Ignite launched task.
+     *
+     * @param host Host.
+     * @param cpuCores Cpu cores count.
+     * @param mem Memory.
+     * @param disk Disk.
+     */
+    public IgniteTask(String host, double cpuCores, double mem, double disk) {
+        this.host = host;
+        this.cpuCores = cpuCores;
+        this.mem = mem;
+        this.disk = disk;
+    }
+
+    /**
+     * @return Host.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * @return Cores count.
+     */
+    public double cpuCores() {
+        return cpuCores;
+    }
+
+    /**
+     * @return Memory.
+     */
+    public double mem() {
+        return mem;
+    }
+
+    /**
+     * @return Disk.
+     */
+    public double disk() {
+        return disk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 5534b2c..2c4b6ee 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -160,6 +160,5 @@ public class IgniteSchedulerSelfTest extends TestCase {
             return null;
         }
 
-
     }
 }
\ No newline at end of file