You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/09 17:25:02 UTC
[04/19] incubator-ignite git commit: #YARN WIP
#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/85f4a891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/85f4a891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/85f4a891
Branch: refs/heads/ignite-1.3
Commit: 85f4a8915fec6e9332a492728c4a37e639916f3f
Parents: 81cde9b
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Fri Jun 5 18:32:28 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Fri Jun 5 18:32:28 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 204 +++++++----
.../apache/ignite/yarn/ClusterProperties.java | 196 ++++------
.../org/apache/ignite/yarn/IgniteContainer.java | 74 ++++
.../org/apache/ignite/yarn/IgniteProvider.java | 362 +++++++++++++++++++
.../java/org/apache/ignite/yarn/IgniteTask.java | 86 -----
.../apache/ignite/yarn/IgniteYarnClient.java | 128 ++++---
.../ignite/yarn/utils/IgniteYarnUtils.java | 83 +++++
.../main/resources/ignite-default-config.xml | 33 ++
8 files changed, 838 insertions(+), 328 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 532830c..95197b7 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -17,86 +17,130 @@
package org.apache.ignite.yarn;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.*;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.commons.io.*;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.client.api.async.*;
import org.apache.hadoop.yarn.conf.*;
import org.apache.hadoop.yarn.util.*;
+import org.apache.ignite.yarn.utils.*;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* TODO
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
- YarnConfiguration configuration;
- NMClient nmClient;
- int numContainersToWaitFor = 1;
+ /** Default port range. */
+ public static final String DEFAULT_PORT = ":47500..47510";
- public ApplicationMaster() {
- configuration = new YarnConfiguration();
+ /** Delimiter char. */
+ public static final String DELIM = ",";
+
+ /** */
+ private YarnConfiguration conf;
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private NMClient nmClient;
+
+ /** */
+ private Path ignitePath;
+
+ /** */
+ private Path cfgPath;
+
+ /** */
+ private FileSystem fs;
+
+ /** */
+ private Map<String, IgniteContainer> containers = new HashMap<>();
+
+ /**
+ * Constructor.
+ */
+ public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception {
+ this.conf = new YarnConfiguration();
+ this.props = props;
+ this.fs = FileSystem.get(conf);
+ this.ignitePath = new Path(ignitePath);
nmClient = NMClient.createNMClient();
- nmClient.init(configuration);
+
+ nmClient.init(conf);
nmClient.start();
}
/** {@inheritDoc} */
- public void onContainersAllocated(List<Container> containers) {
- for (Container container : containers) {
+ public synchronized void onContainersAllocated(List<Container> conts) {
+ for (Container container : conts) {
try {
- // Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- final LocalResource igniteZip = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip,
- configuration);
+ Map<String, String> env = new HashMap<>(System.getenv());
+
+ env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(container.getNodeId().getHost()));
+
+ ctx.setEnvironment(env);
+
+ Map<String, LocalResource> resources = new HashMap<>();
+
+ resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
+ resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+
+ ctx.setLocalResources(resources);
- ctx.setLocalResources(Collections.singletonMap("ignite", igniteZip));
ctx.setCommands(
- Lists.newArrayList(
- "$LOCAL_DIRS/ignite/*/bin/ignite.sh" +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
- ));
+ Collections.singletonList(
+ "./ignite/*/bin/ignite.sh "
+ + "./ignite-config.xml"
+ + " -J-Xmx" + container.getResource().getMemory() + "m"
+ + " -J-Xms" + container.getResource().getMemory() + "m"
+ + IgniteYarnUtils.YARN_LOG_OUT
+ ));
+
System.out.println("[AM] Launching container " + container.getId());
+
nmClient.startContainer(container, ctx);
- } catch (Exception ex) {
+
+ containers.put(container.getNodeId().getHost(),
+ new IgniteContainer(container.getNodeId().getHost(), container.getResource().getVirtualCores(),
+ container.getResource().getMemory()));
+ }
+ catch (Exception ex) {
System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
}
}
}
- /** {@inheritDoc} */
- private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
- throws Exception {
- FileSystem fileSystem = FileSystem.get(conf);
- jarPath = fileSystem.makeQualified(jarPath);
+ /**
+ * @return Address running nodes.
+ */
+ private String getAddress(String address) {
+ if (containers.isEmpty()) {
+ if (address != null && !address.isEmpty())
+ return address + DEFAULT_PORT;
+
+ return "";
+ }
- FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+ StringBuilder sb = new StringBuilder();
- appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- appMasterJar.setSize(jarStat.getLen());
- appMasterJar.setTimestamp(jarStat.getModificationTime());
- appMasterJar.setType(LocalResourceType.ARCHIVE);
- appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+ for (IgniteContainer cont : containers.values())
+ sb.append(cont.host()).append(DEFAULT_PORT).append(DELIM);
- System.out.println("Path :" + jarPath);
+ return sb.substring(0, sb.length() - 1);
}
/** {@inheritDoc} */
- public void onContainersCompleted(List<ContainerStatus> statuses) {
+ public synchronized void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
- System.out.println("[AM] Completed container " + status.getContainerId());
synchronized (this) {
- numContainersToWaitFor--;
}
}
}
@@ -111,6 +155,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public void onError(Throwable t) {
+ nmClient.stop();
}
/** {@inheritDoc} */
@@ -118,28 +163,35 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
return 50;
}
- public boolean doneWithContainers() {
- return numContainersToWaitFor == 0;
- }
+ /**
+ * @param args Args.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ ClusterProperties props = ClusterProperties.from(null);
- public Configuration getConfiguration() {
- return configuration;
- }
+ ApplicationMaster master = new ApplicationMaster(args[0], props);
- public static void main(String[] args) throws Exception {
- ApplicationMaster master = new ApplicationMaster();
- master.runMainLoop();
+ master.init();
+
+ master.run();
}
- public void runMainLoop() throws Exception {
+ /**
+ * Runs application master.
+ *
+ * @throws Exception If failed.
+ */
+ public void run() throws Exception {
+ // Create asyn application master.
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this);
- rmClient.init(getConfiguration());
+ rmClient.init(conf);
rmClient.start();
// Register with ResourceManager
- System.out.println("[AM] registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
+
System.out.println("[AM] registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
@@ -148,27 +200,51 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Resource requirements for worker containers
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(128);
- capability.setVirtualCores(1);
+ capability.setMemory(1024);
+ capability.setVirtualCores(2);
// Make container requests to ResourceManager
- for (int i = 0; i < numContainersToWaitFor; ++i) {
- AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
+ for (int i = 0; i < 1; ++i) {
+ AMRMClient.ContainerRequest containerAsk =
+ new AMRMClient.ContainerRequest(capability, null, null, priority);
+
System.out.println("[AM] Making res-req " + i);
+
rmClient.addContainerRequest(containerAsk);
}
System.out.println("[AM] waiting for containers to finish");
- while (!doneWithContainers()) {
- Thread.sleep(100);
- }
-
+ TimeUnit.MINUTES.sleep(10);
System.out.println("[AM] unregisterApplicationMaster 0");
+
// Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, "", "");
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
+
System.out.println("[AM] unregisterApplicationMaster 1");
}
+
+ /**
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
+ InputStream input = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
+
+ cfgPath = new Path(props.igniteWorkDir() + File.separator + IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
+
+ // Create file. Override by default.
+ FSDataOutputStream outputStream = fs.create(cfgPath, true);
+
+ IOUtils.copy(input, outputStream);
+
+ IOUtils.closeQuietly(input);
+
+ IOUtils.closeQuietly(outputStream);
+ }
+ else
+ cfgPath = new Path(props.igniteConfigUrl());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index 0c6c26d..adddd51 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -18,7 +18,6 @@
package org.apache.ignite.yarn;
import java.io.*;
-import java.net.*;
import java.util.*;
import java.util.logging.*;
import java.util.regex.*;
@@ -30,17 +29,11 @@ public class ClusterProperties {
/** */
private static final Logger log = Logger.getLogger(ClusterProperties.class.getSimpleName());
- /** Unlimited. */
- public static final double UNLIMITED = Double.MAX_VALUE;
-
/** */
- public static final String MESOS_MASTER_URL = "MESOS_MASTER_URL";
+ public static final String EMPTY_STRING = "";
- /** */
- public static final String DEFAULT_MESOS_MASTER_URL = "zk://localhost:2181/mesos";
-
- /** Mesos master url. */
- private String mesosUrl = DEFAULT_MESOS_MASTER_URL;
+ /** Unlimited. */
+ public static final double UNLIMITED = Double.MAX_VALUE;
/** */
public static final String IGNITE_CLUSTER_NAME = "IGNITE_CLUSTER_NAME";
@@ -52,21 +45,6 @@ public class ClusterProperties {
private String clusterName = DEFAULT_CLUSTER_NAME;
/** */
- public static final String IGNITE_HTTP_SERVER_HOST = "IGNITE_HTTP_SERVER_HOST";
-
- /** Http server host. */
- private String httpServerHost = null;
-
- /** */
- public static final String IGNITE_HTTP_SERVER_PORT = "IGNITE_HTTP_SERVER_PORT";
-
- /** */
- public static final String DEFAULT_HTTP_SERVER_PORT = "48610";
-
- /** Http server host. */
- private int httpServerPort = Integer.valueOf(DEFAULT_HTTP_SERVER_PORT);
-
- /** */
public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
/** CPU limit. */
@@ -91,18 +69,6 @@ public class ClusterProperties {
private double memPerNode = UNLIMITED;
/** */
- public static final String IGNITE_TOTAL_DISK_SPACE = "IGNITE_TOTAL_DISK_SPACE";
-
- /** Disk space limit. */
- private double disk = UNLIMITED;
-
- /** */
- public static final String IGNITE_DISK_SPACE_PER_NODE = "IGNITE_DISK_SPACE_PER_NODE";
-
- /** Disk space limit. */
- private double diskPerNode = UNLIMITED;
-
- /** */
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
/** Node count limit. */
@@ -136,19 +102,31 @@ public class ClusterProperties {
private String igniteVer = DEFAULT_IGNITE_VERSION;
/** */
- public static final String IGNITE_PACKAGE_URL = "IGNITE_PACKAGE_URL";
+ public static final String IGNITE_WORKING_DIR = "IGNITE_WORKING_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_WORK_DIR = "/ignite/workdir/";
- /** Ignite package url. */
- private String ignitePackageUrl = null;
+ /** Ignite work directory. */
+ private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
/** */
- public static final String IGNITE_WORK_DIR = "IGNITE_WORK_DIR";
+ public static final String IGNITE_LOCAL_WORK_DIR = "IGNITE_LOCAL_WORK_DIR";
/** */
- public static final String DEFAULT_IGNITE_WORK_DIR = "ignite-releases/";
+ public static final String DEFAULT_IGNITE_LOCAL_WORK_DIR = "./ignite-releases/";
- /** Ignite version. */
- private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
+ /** Ignite local work directory. */
+ private String igniteLocalWorkDir = DEFAULT_IGNITE_LOCAL_WORK_DIR;
+
+ /** */
+ public static final String IGNITE_RELEASES_DIR = "IGNITE_RELEASES_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_RELEASES_DIR = "/ignite/releases/";
+
+ /** Ignite local work directory. */
+ private String igniteReleasesDir = DEFAULT_IGNITE_RELEASES_DIR;
/** */
public static final String IGNITE_USERS_LIBS = "IGNITE_USERS_LIBS";
@@ -253,20 +231,6 @@ public class ClusterProperties {
}
/**
- * @return disk limit.
- */
- public double disk() {
- return disk;
- }
-
- /**
- * @return disk limit per node.
- */
- public double diskPerNode() {
- return diskPerNode;
- }
-
- /**
* @return instance count limit.
*/
public double instances() {
@@ -329,45 +293,31 @@ public class ClusterProperties {
}
/**
- * @return User's libs.
+ * @return Local working directory.
*/
- public String userLibs() {
- return userLibs;
+ public String igniteLocalWorkDir() {
+ return igniteLocalWorkDir;
}
/**
- * @return Ignite configuration.
+ * @return Ignite releases dir.
*/
- public String igniteCfg() {
- return igniteCfg;
+ public String igniteReleasesDir() {
+ return igniteReleasesDir;
}
/**
- * @return Master url.
- */
- public String masterUrl() {
- return mesosUrl;
- }
-
- /**
- * @return Http server host.
- */
- public String httpServerHost() {
- return httpServerHost;
- }
-
- /**
- * @return Http server port.
+ * @return User's libs.
*/
- public int httpServerPort() {
- return httpServerPort;
+ public String userLibs() {
+ return userLibs;
}
/**
- * @return Url to ignite package.
+ * @return Ignite configuration.
*/
- public String ignitePackageUrl() {
- return ignitePackageUrl;
+ public String igniteCfg() {
+ return igniteCfg;
}
/**
@@ -407,36 +357,21 @@ public class ClusterProperties {
ClusterProperties prop = new ClusterProperties();
- prop.mesosUrl = getStringProperty(MESOS_MASTER_URL, props, DEFAULT_MESOS_MASTER_URL);
-
- prop.httpServerHost = getStringProperty(IGNITE_HTTP_SERVER_HOST, props, getNonLoopbackAddress());
-
- String port = System.getProperty("PORT0");
-
- if (port != null && !port.isEmpty())
- prop.httpServerPort = Integer.valueOf(port);
- else
- prop.httpServerPort = Integer.valueOf(getStringProperty(IGNITE_HTTP_SERVER_PORT, props,
- DEFAULT_HTTP_SERVER_PORT));
-
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
- prop.ignitePackageUrl = getStringProperty(IGNITE_PACKAGE_URL, props, null);
prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
- prop.disk = getDoubleProperty(IGNITE_TOTAL_DISK_SPACE, props, UNLIMITED);
- prop.diskPerNode = getDoubleProperty(IGNITE_DISK_SPACE_PER_NODE, props, 1024.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
- prop.igniteWorkDir = getStringProperty(IGNITE_WORK_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
@@ -459,6 +394,38 @@ public class ClusterProperties {
}
/**
+ * Convert to properties to map.
+ *
+ * @return Key-value map.
+ */
+ public Map<String, String> toEnvs() {
+ Map<String, String> envs = new HashMap<>();
+
+ envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName));
+
+ envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
+ envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
+
+ envs.put(IGNITE_TOTAL_CPU, toEnvVal(cpu));
+ envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
+ envs.put(IGNITE_TOTAL_MEMORY, toEnvVal(mem));
+ envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
+ envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
+ envs.put(IGNITE_MIN_CPU_PER_NODE, toEnvVal(minCpu));
+ envs.put(IGNITE_MIN_MEMORY_PER_NODE, toEnvVal(minMemory));
+
+ envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
+ envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
+ envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg));
+ envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs));
+
+ if (hostnameConstraint != null)
+ envs.put(IGNITE_HOSTNAME_CONSTRAINT, toEnvVal(hostnameConstraint.pattern()));
+
+ return envs;
+ }
+
+ /**
* @param name Property name.
* @param fileProps Property file.
* @return Property value.
@@ -472,7 +439,7 @@ public class ClusterProperties {
if (property == null)
property = System.getenv(name);
- return property == null ? defaultVal : Double.valueOf(property);
+ return property == null || property.isEmpty() ? defaultVal : Double.valueOf(property);
}
/**
@@ -489,31 +456,14 @@ public class ClusterProperties {
if (property == null)
property = System.getenv(name);
- return property == null ? defaultVal : property;
+ return property == null || property.isEmpty() ? defaultVal : property;
}
/**
- * Finds a local, non-loopback, IPv4 address
- *
- * @return The first non-loopback IPv4 address found, or <code>null</code> if no such addresses found
- * @throws SocketException If there was a problem querying the network interfaces
+ * @param val Value.
+ * @return If val is null {@link EMPTY_STRING} else to string.
*/
- public static String getNonLoopbackAddress() throws SocketException {
- Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
-
- while (ifaces.hasMoreElements()) {
- NetworkInterface iface = ifaces.nextElement();
-
- Enumeration<InetAddress> addresses = iface.getInetAddresses();
-
- while (addresses.hasMoreElements()) {
- InetAddress addr = addresses.nextElement();
-
- if (addr instanceof Inet4Address && !addr.isLoopbackAddress())
- return addr.getHostAddress();
- }
- }
-
- throw new RuntimeException("Failed. Couldn't find non-loopback address");
+ private String toEnvVal(Object val) {
+ return val == null ? EMPTY_STRING : val.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
new file mode 100644
index 0000000..4e3c285
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+/**
+ * Information about launched task.
+ */
+public class IgniteContainer {
+ /** */
+ public final String host;
+
+ /** */
+ public final double cpuCores;
+
+ /** */
+ public final double mem;
+
+ /**
+ * Ignite launched task.
+ *
+ * @param host Host.
+ * @param cpuCores Cpu cores count.
+ * @param mem Memory
+ */
+ public IgniteContainer(String host, double cpuCores, double mem) {
+ this.host = host;
+ this.cpuCores = cpuCores;
+ this.mem = mem;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Cores count.
+ */
+ public double cpuCores() {
+ return cpuCores;
+ }
+
+ /**
+ * @return Memory.
+ */
+ public double mem() {
+ return mem;
+ }
+
+ @Override
+ public String toString() {
+ return "IgniteTask " +
+ "host: [" + host + ']' +
+ ", cpuCores: [" + cpuCores + "]" +
+ ", mem: [" + mem + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
new file mode 100644
index 0000000..c6e07cb
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.yarn.utils.IgniteYarnUtils;
+
+import java.io.*;
+import java.net.*;
+import java.nio.channels.*;
+import java.util.*;
+
+/**
+ * Class downloads and stores Ignite.
+ */
+public class IgniteProvider {
+ /** */
+ public static final String DOWNLOAD_LINK = "http://tiny.cc/updater/download_community.php";
+
+ /** */
+ public static final String DIRECT_DOWNLOAD_LINK = "http://www.gridgain.com/media/gridgain-community-fabric-";
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private String latestVersion = null;
+
+ /** */
+ private boolean hdfs = false;
+
+ /** */
+ private FileSystem fs;
+
+ /**
+ * @param props Cluster properties.
+ * @param fs Hadoop file system.
+ */
+ public IgniteProvider(ClusterProperties props, FileSystem fs) {
+ this.props = props;
+ this.fs = fs;
+ }
+
+ /**
+ * @return Latest ignite version.
+ */
+ public Path getIgnite() throws Exception {
+ File folder = checkDownloadFolder();
+
+ if (latestVersion == null) {
+ List<String> localFiles = findIgnites(folder);
+ List<String> hdfsFiles = findIgnites(fs, props.igniteReleasesDir());
+
+ String localLatestVersion = findLatestVersion(localFiles);
+ String hdfsLatestVersion = findLatestVersion(hdfsFiles);
+
+ if (localLatestVersion != null && hdfsLatestVersion != null) {
+ if (VersionComparator.INSTANCE.compare(hdfsLatestVersion, localLatestVersion) >= 0) {
+ latestVersion = hdfsLatestVersion;
+
+ hdfs = true;
+ }
+ }
+ else if (localLatestVersion != null)
+ latestVersion = localLatestVersion;
+ else if (hdfsLatestVersion != null) {
+ latestVersion = hdfsLatestVersion;
+
+ hdfs = true;
+ }
+ }
+
+ String newVersion = updateIgnite(latestVersion);
+
+ if (latestVersion != null && newVersion.equals(latestVersion)) {
+ if (hdfs)
+ return new Path(formatPath(props.igniteReleasesDir(), latestVersion));
+ else {
+ return IgniteYarnUtils.copyLocalToHdfs(fs, formatPath(props.igniteLocalWorkDir(), latestVersion),
+ formatPath(props.igniteReleasesDir(), latestVersion));
+ }
+ }
+ else {
+ latestVersion = newVersion;
+
+ return IgniteYarnUtils.copyLocalToHdfs(fs, formatPath(props.igniteLocalWorkDir(), latestVersion),
+ formatPath(props.igniteReleasesDir(), latestVersion));
+ }
+ }
+
+ /**
+ * @param folder Folder.
+ * @return Ignite archives.
+ */
+ private List<String> findIgnites(File folder) {
+ String[] files = folder.list();
+
+ List<String> ignites = new ArrayList<>();
+
+ if (files != null) {
+ for (String fileName : files) {
+ if (fileName.contains("gridgain-community-fabric-") && fileName.endsWith(".zip"))
+ ignites.add(fileName);
+ }
+ }
+
+ return ignites;
+ }
+
+ /**
+ * @param files Files.
+ * @return latest ignite version.
+ */
+ private String findLatestVersion(List<String> files) {
+ String latestVersion = null;
+
+ if (!files.isEmpty()) {
+ if (files.size() == 1)
+ latestVersion = parseVersion(files.get(0));
+ else
+ latestVersion = parseVersion(Collections.max(files, VersionComparator.INSTANCE));
+ }
+
+ return latestVersion;
+ }
+
+ /**
+ * @param fs File system,
+ * @param folder Folder.
+ * @return Ignite archives.
+ */
+ private List<String> findIgnites(FileSystem fs, String folder) {
+ FileStatus[] fileStatuses = null;
+
+ try {
+ fileStatuses = fs.listStatus(new Path(folder));
+ }
+ catch (FileNotFoundException e) {
+ // Ignore. Folder doesn't exist.
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Couldnt get list files from hdfs.", e);
+ }
+
+ List<String> ignites = new ArrayList<>();
+
+ if (fileStatuses != null) {
+ for (FileStatus file : fileStatuses) {
+ String fileName = file.getPath().getName();
+
+ if (fileName.contains("gridgain-community-fabric-") && fileName.endsWith(".zip"))
+ ignites.add(fileName);
+ }
+ }
+
+ return ignites;
+ }
+
+ /**
+ * @param version Ignite version.
+ * @return Ignite.
+ */
+ public Path getIgnite(String version) throws Exception {
+ File folder = checkDownloadFolder();
+
+ // Check to hdfs contains required ignite version.
+ List<String> hdfsFiles = findIgnites(fs, props.igniteReleasesDir());
+
+ if (hdfsFiles != null && !hdfsFiles.isEmpty()) {
+ for (String fileName : hdfsFiles) {
+ if (fileName.equals("gridgain-community-fabric-" + version + ".zip"))
+ return new Path(formatPath(props.igniteReleasesDir(), version));
+ }
+ }
+
+ // Check local fs.
+ List<String> localFiles = findIgnites(folder);
+
+ if (localFiles != null) {
+ for (String fileName : localFiles) {
+ if (fileName.equals("gridgain-community-fabric-" + version + ".zip")) {
+ Path dst = new Path(formatPath(props.igniteReleasesDir(), version));
+
+ fs.copyFromLocalFile(new Path(formatPath(props.igniteLocalWorkDir(), latestVersion)), dst);
+
+ return dst;
+ }
+ }
+ }
+
+ // Download ignite.
+ downloadIgnite(version);
+
+ Path dst = new Path(formatPath(props.igniteReleasesDir(), version));
+
+ fs.copyFromLocalFile(new Path(formatPath(props.igniteLocalWorkDir(), latestVersion)), dst);
+
+ return dst;
+ }
+
+
+ /**
+ * @param folder folder
+ * @param version version
+ * @return Path
+ */
+ private static String formatPath(String folder, String version) {
+ return folder + File.separator + "gridgain-community-fabric-" + version + ".zip";
+ }
+
+ /**
+ * @param currentVersion The current latest version.
+ * @return Current version if the current version is latest; new ignite version otherwise.
+ */
+ private String updateIgnite(String currentVersion) {
+ try {
+ URL url;
+
+ if (currentVersion == null)
+ url = new URL(DOWNLOAD_LINK);
+ else
+ url = new URL(DOWNLOAD_LINK + "?version=" + currentVersion);
+
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code == 200) {
+ String redirectUrl = conn.getURL().toString();
+
+ checkDownloadFolder();
+
+ FileOutputStream outFile = new FileOutputStream(props.igniteLocalWorkDir() + File.separator
+ + fileName(redirectUrl));
+
+ outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE);
+
+ outFile.close();
+
+ return parseVersion(redirectUrl);
+ }
+ else if (code == 304)
+ // This version is latest.
+ return currentVersion;
+ else
+ throw new RuntimeException("Got unexpected response code. Response code: " + code);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed update ignite.", e);
+ }
+ }
+
+ /**
+ * @param version The current latest version.
+ * @return Ignite archive.
+ */
+ private String downloadIgnite(String version) {
+ try {
+ URL url = new URL(DIRECT_DOWNLOAD_LINK + version + ".zip");
+
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code == 200) {
+ checkDownloadFolder();
+
+ String fileName = fileName(url.toString());
+
+ FileOutputStream outFile = new FileOutputStream(props.igniteLocalWorkDir() + File.separator + fileName);
+
+ outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE);
+
+ outFile.close();
+
+ return fileName;
+ }
+ else
+ throw new RuntimeException("Got unexpected response code. Response code: " + code);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed update ignite.", e);
+ }
+ }
+
+ /**
+ * @return Download folder.
+ */
+ private File checkDownloadFolder() {
+ File file = new File(props.igniteLocalWorkDir());
+
+ if (!file.exists())
+ file.mkdirs();
+
+ return file;
+ }
+
+ /**
+ * @param url URL.
+ * @return Ignite version.
+ */
+ private static String parseVersion(String url) {
+ String[] split = url.split("-");
+
+ return split[split.length - 1].replaceAll(".zip", "");
+ }
+
+ /**
+ * @param url URL.
+ * @return File name.
+ */
+ private static String fileName(String url) {
+ String[] split = url.split("/");
+
+ return split[split.length - 1];
+ }
+
+ /**
+ * Ignite version comparator.
+ */
+ public static final class VersionComparator implements Comparator<String> {
+ /** */
+ public static final VersionComparator INSTANCE = new VersionComparator();
+
+ /** */
+ private VersionComparator() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(String f1, String f2) {
+ if (f1.equals(f2))
+ return 0;
+
+ String[] ver1 = parseVersion(f1).split("\\.");
+ String[] ver2 = parseVersion(f2).split("\\.");
+
+ if (Integer.valueOf(ver1[0]) >= Integer.valueOf(ver2[0])
+ && Integer.valueOf(ver1[1]) >= Integer.valueOf(ver2[1])
+ && Integer.valueOf(ver1[2]) >= Integer.valueOf(ver2[2]))
+
+ return 1;
+ else
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
deleted file mode 100644
index 60275fd..0000000
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yarn;
-
-/**
- * Information about launched task.
- */
-public class IgniteTask {
- /** */
- public final String host;
-
- /** */
- public final double cpuCores;
-
- /** */
- public final double mem;
-
- /** */
- public final double disk;
-
- /**
- * Ignite launched task.
- *
- * @param host Host.
- * @param cpuCores Cpu cores count.
- * @param mem Memory.
- * @param disk Disk.
- */
- public IgniteTask(String host, double cpuCores, double mem, double disk) {
- this.host = host;
- this.cpuCores = cpuCores;
- this.mem = mem;
- this.disk = disk;
- }
-
- /**
- * @return Host.
- */
- public String host() {
- return host;
- }
-
- /**
- * @return Cores count.
- */
- public double cpuCores() {
- return cpuCores;
- }
-
- /**
- * @return Memory.
- */
- public double mem() {
- return mem;
- }
-
- /**
- * @return Disk.
- */
- public double disk() {
- return disk;
- }
-
- @Override
- public String toString() {
- return "IgniteTask " +
- "host: [" + host + ']' +
- ", cpuCores: [" + cpuCores + "]" +
- ", mem: [" + mem + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 092aaa9..0ab9e91 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -17,20 +17,15 @@
package org.apache.ignite.yarn;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.conf.*;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.hadoop.yarn.util.*;
+import org.apache.ignite.yarn.utils.*;
+
+import java.io.*;
+import java.util.*;
import java.util.logging.*;
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
@@ -43,11 +38,18 @@ public class IgniteYarnClient {
public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
/**
- * Main methods has only one optional parameter - path to properties files.
+ * Main methods has only one optional parameter - path to properties file.
*
* @param args Args.
*/
public static void main(String[] args) throws Exception {
+ checkArguments(args);
+
+ // Set path to app master jar.
+ String pathAppMasterJar = args[0];
+
+ ClusterProperties props = ClusterProperties.from(args.length == 2 ? args[1] : null);
+
YarnConfiguration conf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
@@ -56,45 +58,48 @@ public class IgniteYarnClient {
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
+ FileSystem fs = FileSystem.get(conf);
+
+ // Load ignite and jar
+ Path ignite = getIgnite(props, fs);
+
+ Path appJar = IgniteYarnUtils.copyLocalToHdfs(fs, pathAppMasterJar,
+ props.igniteWorkDir() + File.separator + IgniteYarnUtils.JAR_NAME);
+
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ System.out.println(Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
+ + IgniteYarnUtils.SPACE + ignite.toUri());
+
amContainer.setCommands(
- Collections.singletonList(
- " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" +
- " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr"
- )
+ Collections.singletonList(
+ Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
+ + IgniteYarnUtils.SPACE + ignite.toUri()
+ + IgniteYarnUtils.YARN_LOG_OUT
+ )
);
// Setup jar for ApplicationMaster
- final LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf);
-
- final LocalResource igniteZip = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
-
- amContainer.setLocalResources(new HashMap<String, LocalResource>() {{
- put("ignite-yarn.jar", appMasterJar);
- put("gridgain-community-fabric-1.0.6.zip", igniteZip);
- }});
-
+ LocalResource appMasterJar = IgniteYarnUtils.setupFile(appJar, fs, LocalResourceType.FILE);
+ amContainer.setLocalResources(Collections.singletonMap(IgniteYarnUtils.JAR_NAME, appMasterJar));
// Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<>();
+ Map<String, String> appMasterEnv = props.toEnvs();
+
setupAppMasterEnv(appMasterEnv, conf);
+
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(256);
+ capability.setMemory(512);
capability.setVirtualCores(1);
// Finally, set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext =
- app.getApplicationSubmissionContext();
- appContext.setApplicationName("simple-yarn-app"); // application name
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ appContext.setApplicationName("ignition"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue("default"); // queue
@@ -106,44 +111,57 @@ public class IgniteYarnClient {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
+
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
Thread.sleep(100);
+
appReport = yarnClient.getApplicationReport(appId);
+
appState = appReport.getYarnApplicationState();
}
- System.out.println(
- "Application " + appId + " finished with" +
- " state " + appState +
- " at " + appReport.getFinishTime());
- }
+ yarnClient.killApplication(appId);
- private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
- throws Exception {
- FileSystem fileSystem = FileSystem.get(conf);
- jarPath = fileSystem.makeQualified(jarPath);
+ System.out.println("Application " + appId + " finished with state " + appState + " at "
+ + appReport.getFinishTime());
+ }
- FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+ /**
+ * Check input arguments.
+ *
+ * @param args Arguments.
+ */
+ private static void checkArguments(String[] args) {
+ if (args.length < 1)
+ throw new IllegalArgumentException();
+ }
- appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- appMasterJar.setSize(jarStat.getLen());
- appMasterJar.setTimestamp(jarStat.getModificationTime());
- appMasterJar.setType(LocalResourceType.ARCHIVE);
- appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+ /**
+ * @param props Properties.
+ * @param fileSystem Hdfs file system.
+ * @return Hdfs path to ignite node.
+ * @throws Exception
+ */
+ private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws Exception {
+ IgniteProvider provider = new IgniteProvider(props, fileSystem);
- System.out.println("Path :" + jarPath);
+ return provider.getIgnite();
}
- private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) {
- for (String c : conf.getStrings(
- YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
- Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+ /**
+ *
+ * @param envs Environment variables.
+ * @param conf Yarn configuration.
+ */
+ private static void setupAppMasterEnv(Map<String, String> envs, YarnConfiguration conf) {
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
+ Apps.addToEnvironment(envs, Environment.CLASSPATH.name(),
c.trim(), File.pathSeparator);
- Apps.addToEnvironment(appMasterEnv,
+ Apps.addToEnvironment(envs,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*",
File.pathSeparator);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
new file mode 100644
index 0000000..1e6c414
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn.utils;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.util.*;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
+
+/**
+ *
+ */
+public class IgniteYarnUtils {
+ /** */
+ public static final String DEFAULT_IGNITE_CONFIG = "ignite-default-config.xml";
+
+ /** */
+ public static final String SPACE = " ";
+
+ /** */
+ public static final String JAR_NAME = "ignite-yarn.jar";
+
+ /** */
+ public static final String YARN_LOG_OUT =
+ " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr";
+
+ /**
+ * @param file Path.
+ * @param fs File system.
+ * @param type Local resource type.
+ * @throws Exception If failed.
+ */
+ public static LocalResource setupFile(Path file, FileSystem fs, LocalResourceType type)
+ throws Exception {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+
+ file = fs.makeQualified(file);
+
+ FileStatus stat = fs.getFileStatus(file);
+
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(file));
+ resource.setSize(stat.getLen());
+ resource.setTimestamp(stat.getModificationTime());
+ resource.setType(type);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ return resource;
+ }
+
+ /**
+ * @param fs File system.
+ * @param src Source path.
+ * @param dst Destination path.
+ * @return Path to file to hdfs file system.
+ */
+ public static Path copyLocalToHdfs(FileSystem fs, String src, String dst) throws Exception {
+ Path dstPath = new Path(dst);
+
+ // Local file isn't removed, dst file override.
+ fs.copyFromLocalFile(false, true, new Path(src), dstPath);
+
+ return dstPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/resources/ignite-default-config.xml
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/resources/ignite-default-config.xml b/modules/yarn/src/main/resources/ignite-default-config.xml
new file mode 100644
index 0000000..96bb669
--- /dev/null
+++ b/modules/yarn/src/main/resources/ignite-default-config.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"/>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>