You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/16 09:15:32 UTC
[06/50] incubator-ignite git commit: #YARN WIP
#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/858d2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/858d2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/858d2a3f
Branch: refs/heads/master
Commit: 858d2a3f757fea2b88ffcb907e0f221699e32420
Parents: 12d9c02
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Mon Jun 8 20:03:31 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Mon Jun 8 20:03:31 2015 +0300
----------------------------------------------------------------------
modules/yarn/README.txt | 8 +-
.../apache/ignite/yarn/ApplicationMaster.java | 187 +++++++++++++------
.../apache/ignite/yarn/ClusterProperties.java | 145 ++++----------
.../org/apache/ignite/yarn/IgniteContainer.java | 33 +++-
.../org/apache/ignite/yarn/IgniteProvider.java | 4 +-
.../apache/ignite/yarn/IgniteYarnClient.java | 13 +-
.../org/apache/ignite/yarn/package-info.java | 2 +-
.../ignite/yarn/utils/IgniteYarnUtils.java | 4 +-
8 files changed, 207 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/README.txt
----------------------------------------------------------------------
diff --git a/modules/yarn/README.txt b/modules/yarn/README.txt
index 75a62f8..5cdd4a2 100644
--- a/modules/yarn/README.txt
+++ b/modules/yarn/README.txt
@@ -1,9 +1,9 @@
-Apache Ignite Mesos Module
+Apache Ignite Yarn Module
------------------------
-Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos.
+Apache Ignite Yarn module provides integration Apache Ignite with Apache Hadoop Yarn.
-Importing Apache Ignite Mesos Module In Maven Project
+Importing Apache Ignite Yarn Module In Maven Project
-------------------------------------
If you are using Maven to manage dependencies of your project, you can add Cloud module
@@ -19,7 +19,7 @@ interested in):
...
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-mesos</artifactId>
+ <artifactId>ignite-yarn</artifactId>
<version>${ignite.version}</version>
</dependency>
...
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index fe065a3..3bf0521 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -19,7 +19,7 @@ package org.apache.ignite.yarn;
import org.apache.commons.io.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.client.api.async.*;
@@ -30,11 +30,16 @@ import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
- * TODO
+ * Application master request containers from Yarn and decides how many resources will be occupied.
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
+ /** */
+ public static final Logger log = Logger.getLogger(ApplicationMaster.class.getSimpleName());
+
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
@@ -51,6 +56,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
private NMClient nmClient;
/** */
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
+
+ /** */
private Path ignitePath;
/** */
@@ -60,7 +68,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
private FileSystem fs;
/** */
- private Map<String, IgniteContainer> containers = new HashMap<>();
+ private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
/**
* Constructor.
@@ -79,47 +87,81 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public synchronized void onContainersAllocated(List<Container> conts) {
- for (Container container : conts) {
- try {
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ for (Container c : conts) {
+ if (checkContainer(c)) {
+ try {
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- Map<String, String> env = new HashMap<>(System.getenv());
+ Map<String, String> env = new HashMap<>(System.getenv());
- env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(container.getNodeId().getHost()));
+ //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
- ctx.setEnvironment(env);
+ ctx.setEnvironment(env);
- Map<String, LocalResource> resources = new HashMap<>();
+ Map<String, LocalResource> resources = new HashMap<>();
- resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
- resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+ resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
+ resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
- ctx.setLocalResources(resources);
+ ctx.setLocalResources(resources);
- ctx.setCommands(
- Collections.singletonList(
- "./ignite/*/bin/ignite.sh "
- + "./ignite-config.xml"
- + " -J-Xmx" + container.getResource().getMemory() + "m"
- + " -J-Xms" + container.getResource().getMemory() + "m"
- + IgniteYarnUtils.YARN_LOG_OUT
- ));
+ ctx.setCommands(
+ Collections.singletonList(
+ "./ignite/*/bin/ignite.sh "
+ + "./ignite-config.xml"
+ + " -J-Xmx" + c.getResource().getMemory() + "m"
+ + " -J-Xms" + c.getResource().getMemory() + "m"
+ + IgniteYarnUtils.YARN_LOG_OUT
+ ));
- System.out.println("[AM] Launching container " + container.getId());
+ log.log(Level.INFO, "Launching container: {0}.", c.getId());
- nmClient.startContainer(container, ctx);
+ nmClient.startContainer(c, ctx);
- containers.put(container.getNodeId().getHost(),
- new IgniteContainer(container.getNodeId().getHost(), container.getResource().getVirtualCores(),
- container.getResource().getMemory()));
- }
- catch (Exception ex) {
- System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
+ containers.put(c.getId(),
+ new IgniteContainer(
+ c.getId(),
+ c.getNodeId(),
+ c.getResource().getVirtualCores(),
+ c.getResource().getMemory()));
+ }
+ catch (Exception ex) {
+ System.err.println("[AM] Error launching container " + c.getId() + " " + ex);
+ }
}
+ else
+ rmClient.releaseAssignedContainer(c.getId());
}
}
/**
+ * Checks that container
+ *
+ * @param cont Container.
+ * @return {@code True} if
+ */
+ private boolean checkContainer(Container cont) {
+ // Check limit on running nodes.
+ if (props.instances() <= containers.size())
+ return false;
+
+ // Check host name
+ if (props.hostnameConstraint() != null
+ && props.hostnameConstraint().matcher(cont.getNodeId().getHost()).matches())
+ return false;
+
+ // Check that slave satisfies min requirements.
+ if (cont.getResource().getVirtualCores() < props.cpusPerNode()
+ || cont.getResource().getMemory() < props.memoryPerNode()) {
+ //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @return Address running nodes.
*/
private String getAddress(String address) {
@@ -133,7 +175,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
StringBuilder sb = new StringBuilder();
for (IgniteContainer cont : containers.values())
- sb.append(cont.host()).append(DEFAULT_PORT).append(DELIM);
+ sb.append(cont.nodeId.getHost()).append(DEFAULT_PORT).append(DELIM);
return sb.substring(0, sb.length() - 1);
}
@@ -141,13 +183,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public synchronized void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
- synchronized (this) {
- }
+ containers.remove(status.getContainerId());
+
+ //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
}
}
/** {@inheritDoc} */
- public void onNodesUpdated(List<NodeReport> updated) {
+ public synchronized void onNodesUpdated(List<NodeReport> updated) {
+ for (NodeReport node : updated) {
+ // If node unusable.
+ if (node.getNodeState().isUnusable()) {
+ for (IgniteContainer cont : containers.values()) {
+ if (cont.nodeId().equals(node.getNodeId())) {
+ containers.remove(cont.id());
+
+ log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
+ new Object[]{node.getNodeId().getHost(), node.getNodeState()});
+ }
+ }
+
+ log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
+ new Object[]{node.getNodeId().getHost(), node.getNodeState()});
+ }
+ }
}
/** {@inheritDoc} */
@@ -169,7 +228,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
- ClusterProperties props = ClusterProperties.from(null);
+ ClusterProperties props = ClusterProperties.from();
ApplicationMaster master = new ApplicationMaster(args[0], props);
@@ -184,60 +243,72 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public void run() throws Exception {
- // Create asyn application master.
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
-
rmClient.init(conf);
rmClient.start();
// Register with ResourceManager
rmClient.registerApplicationMaster("", 0, "");
- System.out.println("[AM] registerApplicationMaster 1");
+ log.log(Level.INFO, "Application master registered.");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
- // Check ignite cluster.
- while (!nmClient.isInState(Service.STATE.STOPPED)) {
- Resource availableRes = rmClient.getAvailableResources();
+ try {
+ // Check ignite cluster.
+ while (!nmClient.isInState(Service.STATE.STOPPED)) {
+ int runningCnt = containers.size();
- if (containers.size() < props.instances() || availableRes.getMemory() >= props.cpusPerNode()
- || availableRes.getVirtualCores() >= props.cpus()) {
- // Resource requirements for worker containers
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(1024);
- capability.setVirtualCores(2);
+ if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) {
+ // Resource requirements for worker containers.
+ Resource capability = Records.newRecord(Resource.class);
- for (int i = 0; i < 1; ++i) {
- // Make container requests to ResourceManager
- AMRMClient.ContainerRequest containerAsk =
+ capability.setMemory((int)props.memoryPerNode());
+ capability.setVirtualCores((int)props.cpusPerNode());
+
+ for (int i = 0; i < props.instances() - runningCnt; ++i) {
+ // Make container requests to ResourceManager
+ AMRMClient.ContainerRequest containerAsk =
new AMRMClient.ContainerRequest(capability, null, null, priority);
- System.out.println("[AM] Making res-req " + i);
+ rmClient.addContainerRequest(containerAsk);
- rmClient.addContainerRequest(containerAsk);
+ log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.",
+ new Object[]{props.memoryPerNode(), props.cpusPerNode()});
+ }
}
- }
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.SECONDS.sleep(5);
+ }
}
+ catch (Exception e) {
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
- System.out.println("[AM] waiting for containers to finish");
-
- System.out.println("[AM] unregisterApplicationMaster 0");
+ System.exit(1);
+ }
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
+ }
- System.out.println("[AM] unregisterApplicationMaster 1");
+ /**
+ * @param availableRes Available resources.
+ * @return {@code True} if cluster contains available resources.
+ */
+ private boolean checkAvailableResource(Resource availableRes) {
+ return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
+ && availableRes.getVirtualCores() >= props.cpusPerNode();
}
/**
* @throws IOException
*/
public void init() throws IOException {
+ // Create async application master.
+ rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
+
if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
InputStream input = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index adddd51..f9fdb59 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -41,28 +41,16 @@ public class ClusterProperties {
/** */
public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster";
- /** Mesos master url. */
+ /** Cluster name. */
private String clusterName = DEFAULT_CLUSTER_NAME;
/** */
- public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
-
- /** CPU limit. */
- private double cpu = UNLIMITED;
-
- /** */
public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE";
/** CPU limit. */
private double cpuPerNode = UNLIMITED;
/** */
- public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY";
-
- /** Memory limit. */
- private double mem = UNLIMITED;
-
- /** */
public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE";
/** Memory limit. */
@@ -72,25 +60,7 @@ public class ClusterProperties {
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
/** Node count limit. */
- private double nodeCnt = UNLIMITED;
-
- /** */
- public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE";
-
- /** */
- public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
-
- /** Min memory per node. */
- private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
-
- /** */
- public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE";
-
- /** */
- public static final double DEFAULT_RESOURCE_MIN_MEM = 256;
-
- /** Min memory per node. */
- private double minMemory = DEFAULT_RESOURCE_MIN_MEM;
+ private double nodeCnt = 3;
/** */
public static final String IGNITE_VERSION = "IGNITE_VERSION";
@@ -170,19 +140,6 @@ public class ClusterProperties {
return clusterName;
}
- /**
- * @return CPU count limit.
- */
- public double cpus() {
- return cpu;
- }
-
- /**
- * Sets CPU count limit.
- */
- public void cpus(double cpu) {
- this.cpu = cpu;
- }
/**
* @return CPU count limit.
@@ -201,22 +158,6 @@ public class ClusterProperties {
/**
* @return mem limit.
*/
- public double memory() {
- return mem;
- }
-
- /**
- * Sets mem limit.
- *
- * @param mem Memory.
- */
- public void memory(double mem) {
- this.mem = mem;
- }
-
- /**
- * @return mem limit.
- */
public double memoryPerNode() {
return memPerNode;
}
@@ -238,22 +179,6 @@ public class ClusterProperties {
}
/**
- * @return min memory per node.
- */
- public double minMemoryPerNode() {
- return minMemory;
- }
-
- /**
- * Sets min memory.
- *
- * @param minMemory Min memory.
- */
- public void minMemoryPerNode(double minMemory) {
- this.minMemory = minMemory;
- }
-
- /**
* Sets hostname constraint.
*
* @param pattern Hostname pattern.
@@ -263,22 +188,6 @@ public class ClusterProperties {
}
/**
- * @return min cpu count per node.
- */
- public double minCpuPerNode() {
- return minCpu;
- }
-
- /**
- * Sets min cpu count per node.
- *
- * @param minCpu min cpu count per node.
- */
- public void minCpuPerNode(double minCpu) {
- this.minCpu = minCpu;
- }
-
- /**
* @return Ignite version.
*/
public String igniteVer() {
@@ -362,13 +271,9 @@ public class ClusterProperties {
prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
- prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
- prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
- prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
- prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
- prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
- prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
- prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
@@ -394,6 +299,40 @@ public class ClusterProperties {
}
/**
+ * @return Cluster configuration.
+ */
+ public static ClusterProperties from() {
+ ClusterProperties prop = new ClusterProperties();
+
+ prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
+
+ prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null);
+ prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null);
+
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0);
+
+ prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
+ prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
+
+ String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, null, null);
+
+ if (pattern != null) {
+ try {
+ prop.hostnameConstraint = Pattern.compile(pattern);
+ }
+ catch (PatternSyntaxException e) {
+ log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+ }
+ }
+
+ return prop;
+ }
+
+ /**
* Convert to properties to map.
*
* @return Key-value map.
@@ -406,13 +345,9 @@ public class ClusterProperties {
envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
- envs.put(IGNITE_TOTAL_CPU, toEnvVal(cpu));
envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
- envs.put(IGNITE_TOTAL_MEMORY, toEnvVal(mem));
envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
- envs.put(IGNITE_MIN_CPU_PER_NODE, toEnvVal(minCpu));
- envs.put(IGNITE_MIN_MEMORY_PER_NODE, toEnvVal(minMemory));
envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
@@ -461,7 +396,7 @@ public class ClusterProperties {
/**
* @param val Value.
- * @return If val is null {@link EMPTY_STRING} else to string.
+ * @return If val is null {@code EMPTY_STRING} else to string.
*/
private String toEnvVal(Object val) {
return val == null ? EMPTY_STRING : val.toString();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
index 4e3c285..a8b0342 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
@@ -17,12 +17,17 @@
package org.apache.ignite.yarn;
+import org.apache.hadoop.yarn.api.records.*;
+
/**
* Information about launched task.
*/
public class IgniteContainer {
/** */
- public final String host;
+ public final ContainerId id;
+
+ /** */
+ public final NodeId nodeId;
/** */
public final double cpuCores;
@@ -33,21 +38,29 @@ public class IgniteContainer {
/**
* Ignite launched task.
*
- * @param host Host.
+ * @param nodeId Node id.
* @param cpuCores Cpu cores count.
* @param mem Memory
*/
- public IgniteContainer(String host, double cpuCores, double mem) {
- this.host = host;
+ public IgniteContainer(ContainerId id, NodeId nodeId, double cpuCores, double mem) {
+ this.id = id;
+ this.nodeId = nodeId;
this.cpuCores = cpuCores;
this.mem = mem;
}
/**
+ * @return Id.
+ */
+ public ContainerId id() {
+ return id;
+ }
+
+ /**
* @return Host.
*/
- public String host() {
- return host;
+ public NodeId nodeId() {
+ return nodeId;
}
/**
@@ -64,10 +77,12 @@ public class IgniteContainer {
return mem;
}
- @Override
- public String toString() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override public String toString() {
return "IgniteTask " +
- "host: [" + host + ']' +
+ "host: [" + nodeId.getHost() + ']' +
", cpuCores: [" + cpuCores + "]" +
", mem: [" + mem + "]";
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
index c6e07cb..1ac2974 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
@@ -18,7 +18,7 @@
package org.apache.ignite.yarn;
import org.apache.hadoop.fs.*;
-import org.apache.ignite.yarn.utils.IgniteYarnUtils;
+import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.net.*;
@@ -26,7 +26,7 @@ import java.nio.channels.*;
import java.util.*;
/**
- * Class downloads and stores Ignite.
+ * Downloads and stores Ignite.
*/
public class IgniteProvider {
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 0ab9e91..f74890d 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -26,6 +26,7 @@ import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.util.*;
+import java.util.concurrent.*;
import java.util.logging.*;
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
@@ -69,9 +70,6 @@ public class IgniteYarnClient {
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- System.out.println(Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
- + IgniteYarnUtils.SPACE + ignite.toUri());
-
amContainer.setCommands(
Collections.singletonList(
Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
@@ -106,16 +104,18 @@ public class IgniteYarnClient {
// Submit application
ApplicationId appId = appContext.getApplicationId();
- System.out.println("Submitting application " + appId);
+
yarnClient.submitApplication(appContext);
+ log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId);
+
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
- Thread.sleep(100);
+ TimeUnit.SECONDS.sleep(1L);
appReport = yarnClient.getApplicationReport(appId);
@@ -124,8 +124,7 @@ public class IgniteYarnClient {
yarnClient.killApplication(appId);
- System.out.println("Application " + appId + " finished with state " + appState + " at "
- + appReport.getFinishTime());
+ log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState});
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
index c47f1e8..6734307 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
@@ -17,6 +17,6 @@
/**
* <!-- Package description. -->
- * Contains classes to support integration with Apache Mesos.
+ * Contains classes to support integration with Apache Hadoop Yarn.
*/
package org.apache.ignite.yarn;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
index 1e6c414..3b62411 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.util.*;
-import java.io.IOException;
-
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
/**
- *
+ * Utils.
*/
public class IgniteYarnUtils {
/** */