You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/18 17:42:44 UTC
[2/2] incubator-ignite git commit: #IGNITE-857 Updated discovery
logic.
#IGNITE-857 Updated discovery logic.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55c166a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55c166a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55c166a6
Branch: refs/heads/ignite-857
Commit: 55c166a64a7f64cccf81ef9ec64cd572c11d94af
Parents: 21a1514
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Mon May 18 18:42:27 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Mon May 18 18:42:27 2015 +0300
----------------------------------------------------------------------
modules/mesos/pom.xml | 7 +
.../apache/ignite/mesos/IgniteFramework.java | 97 +++++++
.../apache/ignite/mesos/IgniteScheduler.java | 286 +++++++++++++++++++
.../org/apache/ignite/mesos/package-info.java | 22 ++
.../ignite/messo/IgniteAmazonScheduler.java | 81 ------
.../apache/ignite/messo/IgniteFramework.java | 108 -------
.../apache/ignite/messo/IgniteScheduler.java | 243 ----------------
.../org/apache/ignite/messo/package-info.java | 22 --
.../org/apache/ignite/IgniteMesosTestSuite.java | 38 +++
.../ignite/mesos/IgniteSchedulerSelfTest.java | 165 +++++++++++
10 files changed, 615 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml
index 4d19b11..ef73c0b 100644
--- a/modules/mesos/pom.xml
+++ b/modules/mesos/pom.xml
@@ -39,6 +39,13 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
new file mode 100644
index 0000000..5c556a1
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import com.google.protobuf.*;
+import org.apache.mesos.*;
+
+/**
+ * TODO
+ */
+public class IgniteFramework {
+ /**
+ * @param args Args
+ */
+ public static void main(String[] args) {
+ checkArgs(args);
+
+ final int frameworkFailoverTimeout = 0;
+
+ Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder()
+ .setName("IgniteFramework")
+ .setUser("") // Have Mesos fill in the current user.
+ .setFailoverTimeout(frameworkFailoverTimeout); // timeout in seconds
+
+ if (System.getenv("MESOS_CHECKPOINT") != null) {
+ System.out.println("Enabling checkpoint for the framework");
+ frameworkBuilder.setCheckpoint(true);
+ }
+
+ // create the scheduler
+ final Scheduler scheduler = new IgniteScheduler();
+
+ // create the driver
+ MesosSchedulerDriver driver;
+ if (System.getenv("MESOS_AUTHENTICATE") != null) {
+ System.out.println("Enabling authentication for the framework");
+
+ if (System.getenv("DEFAULT_PRINCIPAL") == null) {
+ System.err.println("Expecting authentication principal in the environment");
+ System.exit(1);
+ }
+
+ if (System.getenv("DEFAULT_SECRET") == null) {
+ System.err.println("Expecting authentication secret in the environment");
+ System.exit(1);
+ }
+
+ Protos.Credential credential = Protos.Credential.newBuilder()
+ .setPrincipal(System.getenv("DEFAULT_PRINCIPAL"))
+ .setSecret(ByteString.copyFrom(System.getenv("DEFAULT_SECRET").getBytes()))
+ .build();
+
+ frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
+
+ driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential);
+ }
+ else {
+ frameworkBuilder.setPrincipal("ignite-framework-java");
+
+ driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]);
+ }
+
+ int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
+
+ // Ensure that the driver process terminates.
+ driver.stop();
+
+ System.exit(status);
+ }
+
+ /**
+ * Check input arguments.
+ *
+ * @param args Arguments.
+ */
+ private static void checkArgs(String[] args) {
+ if (args.length == 0)
+ throw new IllegalArgumentException("Illegal arguments.");
+
+ // TODO: add more
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
new file mode 100644
index 0000000..7b5623b
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import org.apache.mesos.*;
+import org.slf4j.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * TODO
+ */
+public class IgniteScheduler implements Scheduler {
+ /** Docker image name. */
+ public static final String IMAGE = "apacheignite/ignite-docker";
+
+ /** Startup sctipt path. */
+ public static final String STARTUP_SCRIPT = "/home/ignite/startup.sh";
+
+ /** Cpus. */
+ public static final String CPUS = "cpus";
+
+ /** Mem. */
+ public static final String MEM = "mem";
+
+ /** Default port range. */
+ public static final String DEFAULT_PORT = ":47500..47510";
+
+ /** Delimiter to use in IP names. */
+ public static final String DELIM = ",";
+
+ /** ID generator. */
+ private AtomicInteger taskIdGenerator = new AtomicInteger();
+
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class);
+
+ /** Min of memory required. */
+ public static final int MIN_MEMORY = 256;
+
+ /** Mutex. */
+ private static final Object mux = new Object();
+
+ /** Task on host. */
+ private ConcurrentMap<String, String> tasks = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
+ Protos.MasterInfo masterInfo) {
+ log.info("registered() master={}:{}, framework={}", masterInfo.getIp(), masterInfo.getPort(), frameworkID);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
+ log.info("reregistered");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
+ synchronized (mux) {
+ log.info("resourceOffers() with {} offers", offers.size());
+
+ for (Protos.Offer offer : offers) {
+ Tuple<Double, Double> cpuMem = checkOffer(offer);
+
+ // Decline offer which doesn't match by mem or cpu.
+ if (cpuMem == null) {
+ schedulerDriver.declineOffer(offer.getId());
+
+ continue;
+ }
+
+ // Generate a unique task ID.
+ Protos.TaskID taskId = Protos.TaskID.newBuilder()
+ .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
+
+ log.info("Launching task {}", taskId.getValue());
+
+ // Create task to run.
+ Protos.TaskInfo task = createTask(offer, cpuMem, taskId);
+
+ schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
+ Collections.singletonList(task),
+ Protos.Filters.newBuilder().setRefuseSeconds(1).build());
+
+ tasks.put(taskId.getValue(), offer.getHostname());
+ }
+ }
+ }
+
+ /**
+ * Create Task.
+ *
+ * @param offer Offer.
+ * @param cpuMem Cpu and mem on slave.
+ * @param taskId Task id.
+ * @return Task.
+ */
+ protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) {
+ // Docker image info.
+ Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder()
+ .setImage(IMAGE)
+ .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST);
+
+ // Container info.
+ Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder();
+ cont.setType(Protos.ContainerInfo.Type.DOCKER);
+ cont.setDocker(docker.build());
+
+ return Protos.TaskInfo.newBuilder()
+ .setName("task " + taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(Protos.Resource.newBuilder()
+ .setName(CPUS)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1())))
+ .addResources(Protos.Resource.newBuilder()
+ .setName(MEM)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
+ .setContainer(cont)
+ .setCommand(Protos.CommandInfo.newBuilder()
+ .setShell(false)
+ .addArguments(STARTUP_SCRIPT)
+ .addArguments(String.valueOf(cpuMem.get2().intValue()))
+ .addArguments(getAddress()))
+ .build();
+ }
+
+ /**
+ * @return Address running nodes.
+ */
+ protected String getAddress() {
+ if (tasks.isEmpty())
+ return "";
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String host : tasks.values())
+ sb.append(host).append(DEFAULT_PORT).append(DELIM);
+
+ return sb.substring(0, sb.length() - 1);
+ }
+
+ /**
+ * Check slave resources and return resources infos.
+ *
+ * @param offer Offer request.
+ * @return Pair where first is cpus, second is memory.
+ */
+ private Tuple<Double, Double> checkOffer(Protos.Offer offer) {
+ double cpus = -1;
+ double mem = -1;
+
+ for (Protos.Resource resource : offer.getResourcesList()) {
+ if (resource.getName().equals(CPUS)) {
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ cpus = resource.getScalar().getValue();
+ else
+ log.debug("Cpus resource was not a scalar: " + resource.getType().toString());
+ }
+ else if (resource.getName().equals(MEM)) {
+ if (resource.getType().equals(Protos.Value.Type.SCALAR))
+ mem = resource.getScalar().getValue();
+ else
+ log.debug("Mem resource was not a scalar: " + resource.getType().toString());
+ }
+ else if (resource.getName().equals("disk"))
+ log.debug("Ignoring disk resources from offer");
+ }
+
+ if (cpus < 0)
+ log.debug("No cpus resource present");
+ if (mem < 0)
+ log.debug("No mem resource present");
+
+ if (cpus >= 1 && MIN_MEMORY <= mem)
+ return new Tuple<>(cpus, mem);
+ else {
+ log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
+ "\n" + offer.getAttributesList().toString() +
+ "\nRequested for slave:\n" +
+ " cpus: " + cpus + "\n" +
+ " mem: " + mem);
+
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
+ log.info("offerRescinded()");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
+ final String taskId = taskStatus.getTaskId().getValue();
+
+ log.info("statusUpdate() task {} is in state {}", taskId, taskStatus.getState());
+
+ switch (taskStatus.getState()) {
+ case TASK_FAILED:
+ case TASK_FINISHED:
+ tasks.remove(taskId);
+ break;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
+ Protos.SlaveID slaveID, byte[] bytes) {
+ log.info("frameworkMessage()");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnected(SchedulerDriver schedulerDriver) {
+ log.info("disconnected()");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
+ log.info("slaveLost()");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
+ Protos.SlaveID slaveID, int i) {
+ log.info("executorLost()");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void error(SchedulerDriver schedulerDriver, String s) {
+ log.error("error() {}", s);
+ }
+
+ /**
+ * Tuple.
+ */
+ public static class Tuple<A, B> {
+ /** */
+ private final A val1;
+
+ /** */
+ private final B val2;
+
+ /**
+ *
+ */
+ public Tuple(A val1, B val2) {
+ this.val1 = val1;
+ this.val2 = val2;
+ }
+
+ /**
+ * @return val1
+ */
+ public A get1() {
+ return val1;
+ }
+
+ /**
+ * @return val2
+ */
+ public B get2() {
+ return val2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java
new file mode 100644
index 0000000..49ddf86
--- /dev/null
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Messo Framework.
+ */
+package org.apache.ignite.mesos;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java
deleted file mode 100644
index b11e7c6..0000000
--- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.messo;
-
-import org.apache.mesos.*;
-
-/**
- * TODO
- */
-public class IgniteAmazonScheduler extends IgniteScheduler {
- /** */
- public static final String AMAZON = "amazon";
-
- /** Amazon credential. */
- private final String accessKey, secretKey;
-
- /**
- * Constructor.
- *
- * @param accessKey Access key.
- * @param secretKey Secret key.
- */
- public IgniteAmazonScheduler(String accessKey, String secretKey) {
- assert accessKey != null;
- assert secretKey != null;
-
- this.accessKey = accessKey;
- this.secretKey = secretKey;
- }
-
- /** {@inheritDoc} */
- @Override protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem,
- Protos.TaskID taskId) {
- // Docker image info.
- Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder()
- .setImage(IMAGE)
- .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST);
-
- // Container info.
- Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder();
- cont.setType(Protos.ContainerInfo.Type.DOCKER);
- cont.setDocker(docker.build());
-
- return Protos.TaskInfo.newBuilder()
- .setName("task " + taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(Protos.Resource.newBuilder()
- .setName(CPUS)
- .setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1())))
- .addResources(Protos.Resource.newBuilder()
- .setName(MEM)
- .setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
- .setContainer(cont)
- .setCommand(Protos.CommandInfo.newBuilder()
- .setShell(false)
- .addArguments(STARTUP_SCRIPT)
- .addArguments(String.valueOf(cpuMem.get2().intValue()))
- .addArguments(AMAZON)
- .addArguments(accessKey)
- .addArguments(secretKey))
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java
deleted file mode 100644
index dfc3eb2..0000000
--- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.messo;
-
-import com.google.protobuf.*;
-import org.apache.mesos.*;
-
-/**
- * TODO
- */
-public class IgniteFramework {
- /**
- * @param args Args
- */
- public static void main(String[] args) {
- checkArgs(args);
-
- final int frameworkFailoverTimeout = 0;
-
- Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder()
- .setName("IgniteFramework")
- .setUser("") // Have Mesos fill in the current user.
- .setFailoverTimeout(frameworkFailoverTimeout); // timeout in seconds
-
- if (System.getenv("MESOS_CHECKPOINT") != null) {
- System.out.println("Enabling checkpoint for the framework");
- frameworkBuilder.setCheckpoint(true);
- }
-
- // create the scheduler
- final Scheduler scheduler = createIgniteScheduler(args);
-
- // create the driver
- MesosSchedulerDriver driver;
- if (System.getenv("MESOS_AUTHENTICATE") != null) {
- System.out.println("Enabling authentication for the framework");
-
- if (System.getenv("DEFAULT_PRINCIPAL") == null) {
- System.err.println("Expecting authentication principal in the environment");
- System.exit(1);
- }
-
- if (System.getenv("DEFAULT_SECRET") == null) {
- System.err.println("Expecting authentication secret in the environment");
- System.exit(1);
- }
-
- Protos.Credential credential = Protos.Credential.newBuilder()
- .setPrincipal(System.getenv("DEFAULT_PRINCIPAL"))
- .setSecret(ByteString.copyFrom(System.getenv("DEFAULT_SECRET").getBytes()))
- .build();
-
- frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
-
- driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential);
- }
- else {
- frameworkBuilder.setPrincipal("ignite-framework-java");
-
- driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]);
- }
-
- int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
-
- // Ensure that the driver process terminates.
- driver.stop();
-
- System.exit(status);
- }
-
- /**
- * @param args Arguments.
- * @return Ignite scheduler.
- */
- private static IgniteScheduler createIgniteScheduler(String args[]) {
- if (args.length >= 3 && args[1].equals(IgniteAmazonScheduler.AMAZON))
- return new IgniteAmazonScheduler(args[2], args[3]);
- else
- return new IgniteScheduler();
- }
-
- /**
- * Check input arguments.
- *
- * @param args Arguments.
- */
- private static void checkArgs(String[] args) {
- if (args.length == 0)
- throw new IllegalArgumentException("Illegal arguments.");
-
- // TODO: add more
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java
deleted file mode 100644
index c8b577f..0000000
--- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.messo;
-
-import org.apache.mesos.*;
-import org.slf4j.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * TODO
- */
-public class IgniteScheduler implements Scheduler {
- /** Docker image name. */
- public static final String IMAGE = "apacheignite/ignite-docker";
-
- /** Startup sctipt path. */
- public static final String STARTUP_SCRIPT = "/home/ignite/startup.sh";
-
- /** Cpus. */
- public static final String CPUS = "cpus";
-
- /** Mem. */
- public static final String MEM = "mem";
-
- /** ID generator. */
- private AtomicInteger taskIdGenerator = new AtomicInteger();
-
- /** Logger. */
- private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class);
-
- /** Min of memory required. */
- public static final int MIN_MEMORY = 256;
-
- /** {@inheritDoc} */
- @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID,
- Protos.MasterInfo masterInfo) {
- log.info("registered() master={}:{}, framework={}", masterInfo.getIp(), masterInfo.getPort(), frameworkID);
- }
-
- /** {@inheritDoc} */
- @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
- log.info("reregistered");
- }
-
- /** {@inheritDoc} */
- @Override public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) {
- log.info("resourceOffers() with {} offers", offers.size());
-
- for (Protos.Offer offer : offers) {
- Tuple<Double, Double> cpuMem = checkOffer(offer);
-
- // Decline offer which doesn't match by mem or cpu.
- if (cpuMem == null) {
- schedulerDriver.declineOffer(offer.getId());
-
- continue;
- }
-
- // Generate a unique task ID.
- Protos.TaskID taskId = Protos.TaskID.newBuilder()
- .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
-
- log.info("Launching task {}", taskId.getValue());
-
- // Create task to run.
- Protos.TaskInfo task = createTask(offer, cpuMem, taskId);
-
- schedulerDriver.launchTasks(Collections.singletonList(offer.getId()),
- Collections.singletonList(task),
- Protos.Filters.newBuilder().setRefuseSeconds(1).build());
- }
- }
-
- /**
- * Create Task.
- * @param offer Offer.
- * @param cpuMem Cpu and mem on slave.
- * @param taskId Task id.
- * @return Task.
- */
- protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) {
- // Docker image info.
- Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder()
- .setImage(IMAGE)
- .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST);
-
- // Container info.
- Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder();
- cont.setType(Protos.ContainerInfo.Type.DOCKER);
- cont.setDocker(docker.build());
-
- return Protos.TaskInfo.newBuilder()
- .setName("task " + taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(Protos.Resource.newBuilder()
- .setName(CPUS)
- .setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
- .addResources(Protos.Resource.newBuilder()
- .setName(MEM)
- .setType(Protos.Value.Type.SCALAR)
- .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2())))
- .setContainer(cont)
- .setCommand(Protos.CommandInfo.newBuilder()
- .setShell(false)
- .addArguments(STARTUP_SCRIPT)
- .addArguments(String.valueOf(cpuMem.get2().intValue())))
- .build();
- }
-
- /**
- * Check slave resources and return resources infos.
- *
- * @param offer Offer request.
- * @return Pair where first is cpus, second is memory.
- */
- private Tuple<Double, Double> checkOffer(Protos.Offer offer) {
- double cpus = -1;
- double mem = -1;
-
- for (Protos.Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals(CPUS)) {
- if (resource.getType().equals(Protos.Value.Type.SCALAR))
- cpus = resource.getScalar().getValue();
- else
- log.debug("Cpus resource was not a scalar: " + resource.getType().toString());
- }
- else if (resource.getName().equals(MEM)) {
- if (resource.getType().equals(Protos.Value.Type.SCALAR))
- mem = resource.getScalar().getValue();
- else
- log.debug("Mem resource was not a scalar: " + resource.getType().toString());
- }
- else if (resource.getName().equals("disk"))
- log.debug("Ignoring disk resources from offer");
- }
-
- if (cpus < 0)
- log.debug("No cpus resource present");
- if (mem < 0)
- log.debug("No mem resource present");
-
- if (cpus >= 1 && MIN_MEMORY <= mem)
- return new Tuple<>(cpus, mem);
- else {
- log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() +
- "\n" + offer.getAttributesList().toString() +
- "\nRequested for slave:\n" +
- " cpus: " + cpus + "\n" +
- " mem: " + mem);
-
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
- log.info("offerRescinded()");
- }
-
- /** {@inheritDoc} */
- @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
- log.info("statusUpdate() task {} ", taskStatus);
- }
-
- /** {@inheritDoc} */
- @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
- Protos.SlaveID slaveID, byte[] bytes) {
- log.info("frameworkMessage()");
- }
-
- /** {@inheritDoc} */
- @Override public void disconnected(SchedulerDriver schedulerDriver) {
- log.info("disconnected()");
- }
-
- /** {@inheritDoc} */
- @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
- log.info("slaveLost()");
- }
-
- /** {@inheritDoc} */
- @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,
- Protos.SlaveID slaveID, int i) {
- log.info("executorLost()");
- }
-
- /** {@inheritDoc} */
- @Override public void error(SchedulerDriver schedulerDriver, String s) {
- log.error("error() {}", s);
- }
-
- /**
- * Tuple.
- */
- public static class Tuple<A, B> {
- /** */
- private final A val1;
-
- /** */
- private final B val2;
-
- /**
- *
- */
- public Tuple(A val1, B val2) {
- this.val1 = val1;
- this.val2 = val2;
- }
-
- /**
- * @return val1
- */
- public A get1() {
- return val1;
- }
-
- /**
- * @return val2
- */
- public B get2() {
- return val2;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java b/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java
deleted file mode 100644
index c48ca38..0000000
--- a/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Messo Framework.
- */
-package org.apache.ignite.messo;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
new file mode 100644
index 0000000..f1bcb90
--- /dev/null
+++ b/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import junit.framework.*;
+import org.apache.ignite.mesos.*;
+
+/**
+ * Apache Mesos integration tests.
+ */
+public class IgniteMesosTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite");
+
+ suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
new file mode 100644
index 0000000..5534b2c
--- /dev/null
+++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos;
+
+import junit.framework.*;
+import org.apache.mesos.*;
+
+import java.util.*;
+
+/**
+ * Scheduler tests.
+ */
+public class IgniteSchedulerSelfTest extends TestCase {
+ /** */
+ private IgniteScheduler scheduler;
+
+ /** {@inheritDoc} */
+ @Override public void setUp() throws Exception {
+ super.setUp();
+
+ scheduler = new IgniteScheduler();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testHostRegister() throws Exception {
+ //Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+ //scheduler.resourceOffers(DriverStub.INSTANCE, Lists.);
+ }
+
+ private Protos.Offer createOffer(String hostname, double cpu, double mem) {
+ return Protos.Offer.newBuilder()
+ .setSlaveId(Protos.SlaveID.newBuilder().setValue("1").build())
+ .setHostname(hostname)
+ .addResources(Protos.Resource.newBuilder()
+ .setName(IgniteScheduler.CPUS)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
+ .build())
+ .addResources(Protos.Resource.newBuilder()
+ .setName(IgniteScheduler.MEM)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build())
+ .build())
+ .build();
+ }
+
+ /**
+ * No-op implementation.
+ */
+ public static class DriverStub implements SchedulerDriver {
+ private static final DriverStub INSTANCE = new DriverStub();
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status start() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status stop(boolean failover) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status stop() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status abort() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status join() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status run() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status requestResources(Collection<Protos.Request> requests) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
+ Collection<Protos.TaskInfo> tasks, Protos.Filters filters) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds,
+ Collection<Protos.TaskInfo> tasks) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks,
+ Protos.Filters filters) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status killTask(Protos.TaskID taskId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status declineOffer(Protos.OfferID offerId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status reviveOffers() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status acknowledgeStatusUpdate(Protos.TaskStatus status) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status sendFrameworkMessage(Protos.ExecutorID executorId, Protos.SlaveID slaveId,
+ byte[] data) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Protos.Status reconcileTasks(Collection<Protos.TaskStatus> statuses) {
+ return null;
+ }
+
+
+ }
+}
\ No newline at end of file