You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/29 13:46:42 UTC
[07/38] incubator-ignite git commit: #IGNITE-857 Added resource limit.
#IGNITE-857 Added resource limit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3208738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3208738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3208738
Branch: refs/heads/ignite-943
Commit: e320873828284fb86fb4d6e52cee98a6bb87b4af
Parents: 55c166a
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue May 19 19:42:22 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue May 19 19:42:22 2015 +0300
----------------------------------------------------------------------
modules/mesos/pom.xml | 6 +-
.../apache/ignite/mesos/ClusterResources.java | 130 +++++++++++++++++++
.../apache/ignite/mesos/IgniteFramework.java | 4 +-
.../apache/ignite/mesos/IgniteScheduler.java | 120 ++++++++---------
.../org/apache/ignite/mesos/IgniteTask.java | 78 +++++++++++
.../ignite/mesos/IgniteSchedulerSelfTest.java | 1 -
6 files changed, 272 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml
index ef73c0b..5ce3e5c 100644
--- a/modules/mesos/pom.xml
+++ b/modules/mesos/pom.xml
@@ -68,6 +68,11 @@
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.ignite.mesos.IgniteFramework</mainClass>
+ </manifest>
+ </archive>
</configuration>
<executions>
<execution>
@@ -79,7 +84,6 @@
</execution>
</executions>
</plugin>
-
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
new file mode 100644
index 0000000..0a2193f
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Cluster settings.
+ */
+public class ClusterResources {
+ /** Unlimited. */
+ public static final int DEFAULT_VALUE = -1;
+
+ /** */
+ public static final String IGNITE_RESOURCE_CPU_CORES = "IGNITE_RESOURCE_CPU_CORES";
+
+ /** CPU limit. */
+ private double cpu = DEFAULT_VALUE;
+
+ /** */
+ public static final String IGNITE_RESOURCE_MEM_MB = "IGNITE_RESOURCE_MEM_MB";
+
+ /** Memory limit. */
+ private double mem = DEFAULT_VALUE;
+
+ /** */
+ public static final String IGNITE_RESOURCE_DISK_MB = "IGNITE_RESOURCE_DISK_MB";
+
+ /** Disk space limit. */
+ private double disk = DEFAULT_VALUE;
+
+ /** */
+ public static final String IGNITE_RESOURCE_NODE_CNT = "IGNITE_RESOURCE_NODE_CNT";
+
+ /** Node count limit. */
+ private double nodeCnt = DEFAULT_VALUE;
+
+ /** */
+ public ClusterResources() {
+ // No-op.
+ }
+
+ /**
+ * @return CPU count limit.
+ */
+ public double cpus(){
+ return cpu;
+ }
+
+ /**
+ * @return mem limit.
+ */
+ public double memory() {
+ return mem;
+ }
+
+ /**
+ * @return disk limit.
+ */
+ public double disk() {
+ return disk;
+ }
+
+ /**
+ * @return instance count limit.
+ */
+ public double instances() {
+ return nodeCnt;
+ }
+
+ /**
+ * @param config path to config file.
+ * @return Cluster configuration.
+ */
+ public static ClusterResources from(String config) {
+ try {
+ Properties props = new Properties();
+
+ props.load(new FileInputStream(config));
+
+ ClusterResources resources = new ClusterResources();
+
+ resources.cpu = getProperty(IGNITE_RESOURCE_CPU_CORES, props);
+ resources.mem = getProperty(IGNITE_RESOURCE_MEM_MB, props);
+ resources.disk = getProperty(IGNITE_RESOURCE_DISK_MB, props);
+ resources.nodeCnt = getProperty(IGNITE_RESOURCE_NODE_CNT, props);
+
+ return resources;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static double getProperty(String name, Properties fileProps) {
+ if (fileProps.containsKey(name))
+ return Double.valueOf(fileProps.getProperty(name));
+
+ String property = System.getProperty(name);
+
+ if (property == null)
+ System.getenv(name);
+
+ if (property == null)
+ return DEFAULT_VALUE;
+
+ return Double.valueOf(property);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index 5c556a1..3d309f3 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -25,7 +25,7 @@ import org.apache.mesos.*;
*/
public class IgniteFramework {
/**
- * @param args Args
+ * @param args Args [host:port] [resource limit]
*/
public static void main(String[] args) {
checkArgs(args);
@@ -43,7 +43,7 @@ public class IgniteFramework {
}
// create the scheduler
- final Scheduler scheduler = new IgniteScheduler();
+ final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1]));
// create the driver
MesosSchedulerDriver driver;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 7b5623b..fcbab87 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -40,26 +40,39 @@ public class IgniteScheduler implements Scheduler {
/** Mem. */
public static final String MEM = "mem";
+ /** Disk. */
+ public static final String DISK = "disk";
+
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
+ /** Min of memory required. */
+ public static final int MIN_MEMORY = 256;
+
/** Delimiter to use in IP names. */
public static final String DELIM = ",";
- /** ID generator. */
- private AtomicInteger taskIdGenerator = new AtomicInteger();
-
/** Logger. */
private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class);
- /** Min of memory required. */
- public static final int MIN_MEMORY = 256;
-
/** Mutex. */
private static final Object mux = new Object();
+ /** ID generator. */
+ private AtomicInteger taskIdGenerator = new AtomicInteger();
+
/** Task on host. */
- private ConcurrentMap<String, String> tasks = new ConcurrentHashMap<>();
+ private ConcurrentMap<String, IgniteTask> tasks = new ConcurrentHashMap<>();
+
+ /** Cluster resources. */
+ private ClusterResources clusterLimit;
+
+ /**
+ * @param clusterLimit Resources limit.
+ */
+ public IgniteScheduler(ClusterResources clusterLimit) {
+ this.clusterLimit = clusterLimit;
+ }
/** {@inheritDoc} */
@Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
@@ -78,10 +91,10 @@ public class IgniteScheduler implements Scheduler {
log.info("resourceOffers() with {} offers", offers.size());
for (Protos.Offer offer : offers) {
- Tuple<Double, Double> cpuMem = checkOffer(offer);
+ IgniteTask igniteTask = checkOffer(offer);
// Decline offer which doesn't match by mem or cpu.
- if (cpuMem == null) {
+ if (igniteTask == null) {
schedulerDriver.declineOffer(offer.getId());
continue;
@@ -94,13 +107,13 @@ public class IgniteScheduler implements Scheduler {
log.info("Launching task {}", taskId.getValue());
// Create task to run.
- Protos.TaskInfo task = createTask(offer, cpuMem, taskId);
+ Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
Collections.singletonList(task),
Protos.Filters.newBuilder().setRefuseSeconds(1).build());
- tasks.put(taskId.getValue(), offer.getHostname());
+ tasks.put(taskId.getValue(), igniteTask);
}
}
}
@@ -109,11 +122,11 @@ public class IgniteScheduler implements Scheduler {
* Create Task.
*
* @param offer Offer.
- * @param cpuMem Cpu and mem on slave.
+ * @param igniteTask Task description.
* @param taskId Task id.
* @return Task.
*/
- protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) {
+ protected Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) {
// Docker image info.
Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder()
.setImage(IMAGE)
@@ -131,16 +144,16 @@ public class IgniteScheduler implements Scheduler {
.addResources(Protos.Resource.newBuilder()
.setName(CPUS)
.setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1())))
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores())))
.addResources(Protos.Resource.newBuilder()
.setName(MEM)
.setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
.setContainer(cont)
.setCommand(Protos.CommandInfo.newBuilder()
.setShell(false)
.addArguments(STARTUP_SCRIPT)
- .addArguments(String.valueOf(cpuMem.get2().intValue()))
+ .addArguments(String.valueOf(igniteTask.mem()))
.addArguments(getAddress()))
.build();
}
@@ -154,8 +167,8 @@ public class IgniteScheduler implements Scheduler {
StringBuilder sb = new StringBuilder();
- for (String host : tasks.values())
- sb.append(host).append(DEFAULT_PORT).append(DELIM);
+ for (IgniteTask task : tasks.values())
+ sb.append(task.host()).append(DEFAULT_PORT).append(DELIM);
return sb.substring(0, sb.length() - 1);
}
@@ -164,11 +177,15 @@ public class IgniteScheduler implements Scheduler {
* Check slave resources and return resources infos.
*
* @param offer Offer request.
- * @return Pair where first is cpus, second is memory.
+ * @return Ignite task description.
*/
- private Tuple<Double, Double> checkOffer(Protos.Offer offer) {
- double cpus = -1;
- double mem = -1;
+ private IgniteTask checkOffer(Protos.Offer offer) {
+ if (checkLimit(clusterLimit.instances(), tasks.size()))
+ return null;
+
+ double cpus = -2;
+ double mem = -2;
+ double disk = -2;
for (Protos.Resource resource : offer.getResourcesList()) {
if (resource.getName().equals(CPUS)) {
@@ -183,17 +200,18 @@ public class IgniteScheduler implements Scheduler {
else
log.debug("Mem resource was not a scalar: " + resource.getType().toString());
}
- else if (resource.getName().equals("disk"))
- log.debug("Ignoring disk resources from offer");
+ else if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ disk = resource.getScalar().getValue();
+ else
+ log.debug("Disk resource was not a scalar: " + resource.getType().toString());
}
- if (cpus < 0)
- log.debug("No cpus resource present");
- if (mem < 0)
- log.debug("No mem resource present");
+ if (checkLimit(clusterLimit.memory(), mem) &&
+ checkLimit(clusterLimit.cpus(), cpus) &&
+ checkLimit(clusterLimit.disk(), disk) &&
+ MIN_MEMORY <= mem)
- if (cpus >= 1 && MIN_MEMORY <= mem)
- return new Tuple<>(cpus, mem);
+ return new IgniteTask(offer.getHostname(), cpus, mem, disk);
else {
log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
"\n" + offer.getAttributesList().toString() +
@@ -205,6 +223,15 @@ public class IgniteScheduler implements Scheduler {
}
}
+ /**
+ * @param limit Limit.
+ * @param value Value.
+ * @return {@code True} if limit isn't violated else {@code false}.
+ */
+ private boolean checkLimit(double limit, double value) {
+ return limit == ClusterResources.DEFAULT_VALUE || limit <= value;
+ }
+
/** {@inheritDoc} */
@Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
log.info("offerRescinded()");
@@ -250,37 +277,4 @@ public class IgniteScheduler implements Scheduler {
@Override public void error(SchedulerDriver schedulerDriver, String s) {
log.error("error() {}", s);
}
-
- /**
- * Tuple.
- */
- public static class Tuple<A, B> {
- /** */
- private final A val1;
-
- /** */
- private final B val2;
-
- /**
- *
- */
- public Tuple(A val1, B val2) {
- this.val1 = val1;
- this.val2 = val2;
- }
-
- /**
- * @return val1
- */
- public A get1() {
- return val1;
- }
-
- /**
- * @return val2
- */
- public B get2() {
- return val2;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
new file mode 100644
index 0000000..bad9996
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+/**
+ * TODO
+ */
+public class IgniteTask {
+ /** */
+ public final String host;
+
+ /** */
+ public final double cpuCores;
+
+ /** */
+ public final double mem;
+
+ /** */
+ public final double disk;
+
+ /**
+ * Ignite launched task.
+ *
+ * @param host Host.
+ * @param cpuCores Cpu cores count.
+ * @param mem Memory.
+ * @param disk Disk.
+ */
+ public IgniteTask(String host, double cpuCores, double mem, double disk) {
+ this.host = host;
+ this.cpuCores = cpuCores;
+ this.mem = mem;
+ this.disk = disk;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Cores count.
+ */
+ public double cpuCores() {
+ return cpuCores;
+ }
+
+ /**
+ * @return Memory.
+ */
+ public double mem() {
+ return mem;
+ }
+
+ /**
+ * @return Disk.
+ */
+ public double disk() {
+ return disk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3208738/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 5534b2c..2c4b6ee 100644
--- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -160,6 +160,5 @@ public class IgniteSchedulerSelfTest extends TestCase {
return null;
}
-
}
}
\ No newline at end of file