You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 18:06:42 UTC
[1/8] incubator-ignite git commit: #YARN Added simple app
Repository: incubator-ignite
Updated Branches:
refs/heads/yarn [created] 7e072dc44
#YARN Added simple app
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50cfa27c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50cfa27c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50cfa27c
Branch: refs/heads/yarn
Commit: 50cfa27c273e3f82687c54738b52fc23a5e7bb33
Parents: c232631
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jun 1 17:31:07 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon Jun 1 17:31:07 2015 +0300
----------------------------------------------------------------------
modules/yarn/README.txt | 28 +
modules/yarn/licenses/apache-2.0.txt | 202 ++++++++
modules/yarn/pom.xml | 94 ++++
.../apache/ignite/yarn/ApplicationMaster.java | 133 +++++
.../apache/ignite/yarn/ClusterProperties.java | 519 +++++++++++++++++++
.../java/org/apache/ignite/yarn/IgniteTask.java | 86 +++
.../apache/ignite/yarn/IgniteYarnClient.java | 50 ++
.../org/apache/ignite/yarn/package-info.java | 22 +
.../org/apache/ignite/IgniteMesosTestSuite.java | 38 ++
.../ignite/yarn/IgniteSchedulerSelfTest.java | 27 +
pom.xml | 1 +
11 files changed, 1200 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/README.txt
----------------------------------------------------------------------
diff --git a/modules/yarn/README.txt b/modules/yarn/README.txt
new file mode 100644
index 0000000..75a62f8
--- /dev/null
+++ b/modules/yarn/README.txt
@@ -0,0 +1,28 @@
+Apache Ignite Mesos Module
+------------------------
+
+Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos.
+
+Importing Apache Ignite Mesos Module In Maven Project
+-------------------------------------
+
+If you are using Maven to manage dependencies of your project, you can add Cloud module
+dependency like this (replace '${ignite.version}' with actual Ignite version you are
+interested in):
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ ...
+ <dependencies>
+ ...
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-mesos</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+ ...
+ </dependencies>
+ ...
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/licenses/apache-2.0.txt
----------------------------------------------------------------------
diff --git a/modules/yarn/licenses/apache-2.0.txt b/modules/yarn/licenses/apache-2.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/modules/yarn/licenses/apache-2.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/modules/yarn/pom.xml b/modules/yarn/pom.xml
new file mode 100644
index 0000000..f51b0e8
--- /dev/null
+++ b/modules/yarn/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-yarn</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+
+ <properties>
+ <hadoop.version>2.7.0</hadoop.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.ignite.yarn.IgniteYarnClient</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
new file mode 100644
index 0000000..f52a1de
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.yarn.api.*;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.*;
+import org.apache.hadoop.yarn.client.api.async.*;
+import org.apache.hadoop.yarn.conf.*;
+import org.apache.hadoop.yarn.util.*;
+
+import java.util.*;
+
+/**
+ * TODO
+ */
+public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
+ /** {@inheritDoc} */
+ @Override public void onContainersCompleted(List<ContainerStatus> statuses) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onContainersAllocated(List<Container> containers) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onShutdownRequest() {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getProgress() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(Throwable e) {
+
+ }
+
+ /**
+ * @param args Arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final String command = args[0];
+ final int n = Integer.valueOf(args[1]);
+
+ // Initialize clients to ResourceManager and NodeManagers
+ Configuration conf = new YarnConfiguration();
+
+ AMRMClient<AMRMClient.ContainerRequest> rmClient = AMRMClient.createAMRMClient();
+ rmClient.init(conf);
+ rmClient.start();
+
+ NMClient nmClient = NMClient.createNMClient();
+ nmClient.init(conf);
+ nmClient.start();
+
+ // Register with ResourceManager
+ rmClient.registerApplicationMaster("", 0, "");
+
+ // Priority for worker containers - priorities are intra-application
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ // Resource requirements for worker containers
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(128);
+ capability.setVirtualCores(1);
+
+ // Make container requests to ResourceManager
+ for (int i = 0; i < n; ++i) {
+ AMRMClient.ContainerRequest containerAsk =
+ new AMRMClient.ContainerRequest(capability, null, null, priority);
+
+ rmClient.addContainerRequest(containerAsk);
+ }
+
+ // Obtain allocated containers, launch and check for responses
+ int responseId = 0;
+ int completedContainers = 0;
+ while (completedContainers < n) {
+ AllocateResponse response = rmClient.allocate(responseId++);
+ for (Container container : response.getAllocatedContainers()) {
+ // Launch container by create ContainerLaunchContext
+ ContainerLaunchContext ctx =
+ Records.newRecord(ContainerLaunchContext.class);
+
+ ctx.setCommands(
+ Collections.singletonList(
+ command +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+ ));
+
+ nmClient.startContainer(container, ctx);
+ }
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ++completedContainers;
+ System.out.println("Completed container " + status.getContainerId());
+ }
+ Thread.sleep(100);
+ }
+
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
new file mode 100644
index 0000000..0c6c26d
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.logging.*;
+import java.util.regex.*;
+
+/**
+ * Cluster settings.
+ */
+public class ClusterProperties {
+ /** */
+ private static final Logger log = Logger.getLogger(ClusterProperties.class.getSimpleName());
+
+ /** Unlimited. */
+ public static final double UNLIMITED = Double.MAX_VALUE;
+
+ /** */
+ public static final String MESOS_MASTER_URL = "MESOS_MASTER_URL";
+
+ /** */
+ public static final String DEFAULT_MESOS_MASTER_URL = "zk://localhost:2181/mesos";
+
+ /** Mesos master url. */
+ private String mesosUrl = DEFAULT_MESOS_MASTER_URL;
+
+ /** */
+ public static final String IGNITE_CLUSTER_NAME = "IGNITE_CLUSTER_NAME";
+
+ /** */
+ public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster";
+
+ /** Mesos master url. */
+ private String clusterName = DEFAULT_CLUSTER_NAME;
+
+ /** */
+ public static final String IGNITE_HTTP_SERVER_HOST = "IGNITE_HTTP_SERVER_HOST";
+
+ /** Http server host. */
+ private String httpServerHost = null;
+
+ /** */
+ public static final String IGNITE_HTTP_SERVER_PORT = "IGNITE_HTTP_SERVER_PORT";
+
+ /** */
+ public static final String DEFAULT_HTTP_SERVER_PORT = "48610";
+
+ /** Http server host. */
+ private int httpServerPort = Integer.valueOf(DEFAULT_HTTP_SERVER_PORT);
+
+ /** */
+ public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
+
+ /** CPU limit. */
+ private double cpu = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE";
+
+ /** CPU limit. */
+ private double cpuPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY";
+
+ /** Memory limit. */
+ private double mem = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE";
+
+ /** Memory limit. */
+ private double memPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_TOTAL_DISK_SPACE = "IGNITE_TOTAL_DISK_SPACE";
+
+ /** Disk space limit. */
+ private double disk = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_DISK_SPACE_PER_NODE = "IGNITE_DISK_SPACE_PER_NODE";
+
+ /** Disk space limit. */
+ private double diskPerNode = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
+
+ /** Node count limit. */
+ private double nodeCnt = UNLIMITED;
+
+ /** */
+ public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE";
+
+ /** */
+ public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
+
+ /** Min memory per node. */
+ private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
+
+ /** */
+ public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE";
+
+ /** */
+ public static final double DEFAULT_RESOURCE_MIN_MEM = 256;
+
+ /** Min memory per node. */
+ private double minMemory = DEFAULT_RESOURCE_MIN_MEM;
+
+ /** */
+ public static final String IGNITE_VERSION = "IGNITE_VERSION";
+
+ /** */
+ public static final String DEFAULT_IGNITE_VERSION = "latest";
+
+ /** Ignite version. */
+ private String igniteVer = DEFAULT_IGNITE_VERSION;
+
+ /** */
+ public static final String IGNITE_PACKAGE_URL = "IGNITE_PACKAGE_URL";
+
+ /** Ignite package url. */
+ private String ignitePackageUrl = null;
+
+ /** */
+ public static final String IGNITE_WORK_DIR = "IGNITE_WORK_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_WORK_DIR = "ignite-releases/";
+
+ /** Ignite version. */
+ private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
+
+ /** */
+ public static final String IGNITE_USERS_LIBS = "IGNITE_USERS_LIBS";
+
+ /** Path to users libs. */
+ private String userLibs = null;
+
+ /** */
+ public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL";
+
+ /** URL to users libs. */
+ private String userLibsUrl = null;
+
+ /** */
+ public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG";
+
+ /** Ignite config. */
+ private String igniteCfg = null;
+
+ /** */
+ public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL";
+
+ /** Url to ignite config. */
+ private String igniteCfgUrl = null;
+
+ /** */
+ public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT";
+
+ /** Url to ignite config. */
+ private Pattern hostnameConstraint = null;
+
+ /** */
+ public ClusterProperties() {
+ // No-op.
+ }
+
+ /**
+ * @return Cluster name.
+ */
+ public String clusterName() {
+ return clusterName;
+ }
+
+ /**
+ * @return CPU count limit.
+ */
+ public double cpus() {
+ return cpu;
+ }
+
+ /**
+ * Sets CPU count limit.
+ */
+ public void cpus(double cpu) {
+ this.cpu = cpu;
+ }
+
+ /**
+ * @return CPU count limit.
+ */
+ public double cpusPerNode() {
+ return cpuPerNode;
+ }
+
+ /**
+ * Sets CPU count limit.
+ */
+ public void cpusPerNode(double cpu) {
+ this.cpuPerNode = cpu;
+ }
+
+ /**
+ * @return mem limit.
+ */
+ public double memory() {
+ return mem;
+ }
+
+ /**
+ * Sets mem limit.
+ *
+ * @param mem Memory.
+ */
+ public void memory(double mem) {
+ this.mem = mem;
+ }
+
+ /**
+ * @return mem limit.
+ */
+ public double memoryPerNode() {
+ return memPerNode;
+ }
+
+ /**
+ * Sets mem limit.
+ *
+ * @param mem Memory.
+ */
+ public void memoryPerNode(double mem) {
+ this.memPerNode = mem;
+ }
+
+ /**
+ * @return disk limit.
+ */
+ public double disk() {
+ return disk;
+ }
+
+ /**
+ * @return disk limit per node.
+ */
+ public double diskPerNode() {
+ return diskPerNode;
+ }
+
+ /**
+ * @return instance count limit.
+ */
+ public double instances() {
+ return nodeCnt;
+ }
+
+ /**
+ * @return min memory per node.
+ */
+ public double minMemoryPerNode() {
+ return minMemory;
+ }
+
+ /**
+ * Sets min memory.
+ *
+ * @param minMemory Min memory.
+ */
+ public void minMemoryPerNode(double minMemory) {
+ this.minMemory = minMemory;
+ }
+
+ /**
+ * Sets hostname constraint.
+ *
+ * @param pattern Hostname pattern.
+ */
+ public void hostnameConstraint(Pattern pattern) {
+ this.hostnameConstraint = pattern;
+ }
+
+ /**
+ * @return min cpu count per node.
+ */
+ public double minCpuPerNode() {
+ return minCpu;
+ }
+
+ /**
+ * Sets min cpu count per node.
+ *
+ * @param minCpu min cpu count per node.
+ */
+ public void minCpuPerNode(double minCpu) {
+ this.minCpu = minCpu;
+ }
+
+ /**
+ * @return Ignite version.
+ */
+ public String igniteVer() {
+ return igniteVer;
+ }
+
+ /**
+ * @return Working directory.
+ */
+ public String igniteWorkDir() {
+ return igniteWorkDir;
+ }
+
+ /**
+ * @return User's libs.
+ */
+ public String userLibs() {
+ return userLibs;
+ }
+
+ /**
+ * @return Ignite configuration.
+ */
+ public String igniteCfg() {
+ return igniteCfg;
+ }
+
+ /**
+ * @return Master url.
+ */
+ public String masterUrl() {
+ return mesosUrl;
+ }
+
+ /**
+ * @return Http server host.
+ */
+ public String httpServerHost() {
+ return httpServerHost;
+ }
+
+ /**
+ * @return Http server port.
+ */
+ public int httpServerPort() {
+ return httpServerPort;
+ }
+
+ /**
+ * @return Url to ignite package.
+ */
+ public String ignitePackageUrl() {
+ return ignitePackageUrl;
+ }
+
+ /**
+ * @return Url to ignite configuration.
+ */
+ public String igniteConfigUrl() {
+ return igniteCfgUrl;
+ }
+
+ /**
+ * @return Url to users libs configuration.
+ */
+ public String usersLibsUrl() {
+ return userLibsUrl;
+ }
+
+ /**
+ * @return Host name constraint.
+ */
+ public Pattern hostnameConstraint() {
+ return hostnameConstraint;
+ }
+
+ /**
+ * @param config path to config file.
+ * @return Cluster configuration.
+ */
+ public static ClusterProperties from(String config) {
+ try {
+ Properties props = null;
+
+ if (config != null) {
+ props = new Properties();
+
+ props.load(new FileInputStream(config));
+ }
+
+ ClusterProperties prop = new ClusterProperties();
+
+ prop.mesosUrl = getStringProperty(MESOS_MASTER_URL, props, DEFAULT_MESOS_MASTER_URL);
+
+ prop.httpServerHost = getStringProperty(IGNITE_HTTP_SERVER_HOST, props, getNonLoopbackAddress());
+
+ String port = System.getProperty("PORT0");
+
+ if (port != null && !port.isEmpty())
+ prop.httpServerPort = Integer.valueOf(port);
+ else
+ prop.httpServerPort = Integer.valueOf(getStringProperty(IGNITE_HTTP_SERVER_PORT, props,
+ DEFAULT_HTTP_SERVER_PORT));
+
+ prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
+
+ prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
+ prop.ignitePackageUrl = getStringProperty(IGNITE_PACKAGE_URL, props, null);
+ prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
+
+ prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
+ prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
+ prop.disk = getDoubleProperty(IGNITE_TOTAL_DISK_SPACE, props, UNLIMITED);
+ prop.diskPerNode = getDoubleProperty(IGNITE_DISK_SPACE_PER_NODE, props, 1024.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
+ prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
+ prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
+
+ prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORK_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
+ prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
+
+ String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
+
+ if (pattern != null) {
+ try {
+ prop.hostnameConstraint = Pattern.compile(pattern);
+ }
+ catch (PatternSyntaxException e) {
+ log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+ }
+ }
+
+ return prop;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static double getDoubleProperty(String name, Properties fileProps, Double defaultVal) {
+ if (fileProps != null && fileProps.containsKey(name))
+ return Double.valueOf(fileProps.getProperty(name));
+
+ String property = System.getProperty(name);
+
+ if (property == null)
+ property = System.getenv(name);
+
+ return property == null ? defaultVal : Double.valueOf(property);
+ }
+
+ /**
+ * @param name Property name.
+ * @param fileProps Property file.
+ * @return Property value.
+ */
+ private static String getStringProperty(String name, Properties fileProps, String defaultVal) {
+ if (fileProps != null && fileProps.containsKey(name))
+ return fileProps.getProperty(name);
+
+ String property = System.getProperty(name);
+
+ if (property == null)
+ property = System.getenv(name);
+
+ return property == null ? defaultVal : property;
+ }
+
+ /**
+ * Finds a local, non-loopback, IPv4 address
+ *
+ * @return The first non-loopback IPv4 address found, or <code>null</code> if no such addresses found
+ * @throws SocketException If there was a problem querying the network interfaces
+ */
+ public static String getNonLoopbackAddress() throws SocketException {
+ Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
+
+ while (ifaces.hasMoreElements()) {
+ NetworkInterface iface = ifaces.nextElement();
+
+ Enumeration<InetAddress> addresses = iface.getInetAddresses();
+
+ while (addresses.hasMoreElements()) {
+ InetAddress addr = addresses.nextElement();
+
+ if (addr instanceof Inet4Address && !addr.isLoopbackAddress())
+ return addr.getHostAddress();
+ }
+ }
+
+ throw new RuntimeException("Failed. Couldn't find non-loopback address");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
new file mode 100644
index 0000000..60275fd
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+/**
+ * Information about launched task.
+ */
+public class IgniteTask {
+ /** */
+ public final String host;
+
+ /** */
+ public final double cpuCores;
+
+ /** */
+ public final double mem;
+
+ /** */
+ public final double disk;
+
+ /**
+ * Ignite launched task.
+ *
+ * @param host Host.
+ * @param cpuCores Cpu cores count.
+ * @param mem Memory.
+ * @param disk Disk.
+ */
+ public IgniteTask(String host, double cpuCores, double mem, double disk) {
+ this.host = host;
+ this.cpuCores = cpuCores;
+ this.mem = mem;
+ this.disk = disk;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Cores count.
+ */
+ public double cpuCores() {
+ return cpuCores;
+ }
+
+ /**
+ * @return Memory.
+ */
+ public double mem() {
+ return mem;
+ }
+
+ /**
+ * @return Disk.
+ */
+ public double disk() {
+ return disk;
+ }
+
+ @Override
+ public String toString() {
+ return "IgniteTask " +
+ "host: [" + host + ']' +
+ ", cpuCores: [" + cpuCores + "]" +
+ ", mem: [" + mem + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
new file mode 100644
index 0000000..7cef50d
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import org.apache.hadoop.yarn.client.api.*;
+import org.apache.hadoop.yarn.conf.*;
+
+import java.util.logging.*;
+
+/**
+ * Ignite yarn client.
+ */
+public class IgniteYarnClient {
+ /** */
+ public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
+
+ /**
+ * Main methods has only one optional parameter - path to properties files.
+ *
+ * @param args Args.
+ */
+ public static void main(String[] args) throws Exception {
+ ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null);
+
+ // Create yarnClient
+ YarnConfiguration conf = new YarnConfiguration();
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ YarnClientApplication app = yarnClient.createApplication();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
new file mode 100644
index 0000000..c47f1e8
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains classes to support integration with Apache Mesos.
+ */
+package org.apache.ignite.yarn;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
new file mode 100644
index 0000000..e6920b3
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import junit.framework.*;
+import org.apache.ignite.yarn.*;
+
+/**
+ * Apache Mesos integration tests.
+ */
+public class IgniteMesosTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite");
+
+ suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
new file mode 100644
index 0000000..1a03743
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import junit.framework.*;
+
+/**
+ * Scheduler tests.
+ */
+public class IgniteSchedulerSelfTest extends TestCase {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50cfa27c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f8524f..9cea078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<module>modules/gce</module>
<module>modules/cloud</module>
<module>modules/mesos</module>
+ <module>modules/yarn</module>
</modules>
<profiles>
[8/8] incubator-ignite git commit: #YARN Code cleanup. Added tests.
Posted by sb...@apache.org.
#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e072dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e072dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e072dc4
Branch: refs/heads/yarn
Commit: 7e072dc44eacd4ad088901ea7888aa8ccaf7d44a
Parents: 960e19d
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue Jun 9 19:02:56 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue Jun 9 19:02:56 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 31 ++-
.../yarn/IgniteApplicationMasterSelfTest.java | 226 ++++++++++++++++++-
2 files changed, 243 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index c552ea0..0ef1362 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -91,7 +91,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
Map<String, String> env = new HashMap<>(System.getenv());
- //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
+ env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
ctx.setEnvironment(env);
@@ -284,15 +284,18 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
}
}
+ catch (InterruptedException e) {
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
+
+ log.log(Level.WARNING, "Application master killed.");
+ }
catch (Exception e) {
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
- System.exit(1);
+ log.log(Level.SEVERE, "Application master failed.", e);
}
-
- // Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
}
/**
@@ -364,4 +367,22 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public void setSchedulerTimeout(long schedulerTimeout) {
this.schedulerTimeout = schedulerTimeout;
}
+
+ /**
+ * Sets file system.
+ * @param fs File system.
+ */
+ public void setFs(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ /**
+ * JUST FOR TESTING!!!
+ *
+ * @return Running containers.
+ */
+ @Deprecated
+ public Map<ContainerId, IgniteContainer> getContainers() {
+ return containers;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
index d865659..abac58e 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -18,8 +18,9 @@
package org.apache.ignite.yarn;
import junit.framework.*;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
@@ -27,8 +28,11 @@ import org.apache.hadoop.yarn.client.api.async.*;
import org.apache.hadoop.yarn.exceptions.*;
import java.io.*;
+import java.net.*;
import java.nio.*;
import java.util.*;
+import java.util.concurrent.*;
+import java.util.regex.*;
/**
* Application master tests.
@@ -52,7 +56,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
props = new ClusterProperties();
appMaster = new ApplicationMaster("test", props);
- appMaster.setSchedulerTimeout(100000);
+ appMaster.setSchedulerTimeout(10000);
rmMock.clear();
}
@@ -82,7 +86,6 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
}
-
/**
* @throws Exception If failed.
*/
@@ -98,14 +101,125 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
Thread thread = runAppMaster(appMaster);
- interruptedThread(thread);
-
List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+ interruptedThread(thread);
+
assertEquals(0, contRequests.size());
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClusterAllocatedResource() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 2000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 1, 7000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(1, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 7000)));
+ assertEquals(2, appMaster.getContainers().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartReleaseContainer() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ NMMock nmClient = new NMMock();
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(nmClient);
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+ assertEquals(1, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 7000)));
+ assertEquals(2, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 9, 2000)));
+ assertEquals(3, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(3, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testHostnameConstraint() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ NMMock nmClient = new NMMock();
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(nmClient);
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+ props.hostnameConstraint(Pattern.compile("ignoreHost"));
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(0, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("ignoreHost", 8, 5000)));
+ assertEquals(1, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+ }
+
+ /**
+ * @param host Host.
+ * @param cpu Cpu count.
+ * @param mem Memory.
+ * @return Container.
+ */
+ private Container createContainer(String host, int cpu, int mem) {
+ return Container.newInstance(
+ ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0),
+ ThreadLocalRandom.current().nextLong()),
+ NodeId.newInstance(host, 0),
+ "example.com",
+ new MockResource(mem, cpu),
+ Priority.newInstance(0),
+ null
+ );
+ }
+
+ /**
* @param rmMock RM mock.
* @param expectedCnt Expected cnt.
* @param timeOut Timeout.
@@ -147,7 +261,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
/**
- * Interrupt thread and wait.
+ * Interrupt thread and join.
*
* @param thread Thread.
*/
@@ -165,6 +279,9 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
/** */
+ private List<ContainerId> releasedConts = new ArrayList<>();
+
+ /** */
private Resource availableRes;
/** */
@@ -180,6 +297,13 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
/**
+ * @return Released resources.
+ */
+ public List<ContainerId> releasedResources() {
+ return releasedConts;
+ }
+
+ /**
* Sets resource.
*
* @param availableRes Available resource.
@@ -193,6 +317,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
*/
public void clear() {
contRequests.clear();
+ releasedConts.clear();
availableRes = null;
}
@@ -226,7 +351,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
/** {@inheritDoc} */
@Override public void releaseAssignedContainer(ContainerId containerId) {
- // No-op.
+ releasedConts.add(containerId);
}
/** {@inheritDoc} */
@@ -250,13 +375,26 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
*/
public static class NMMock extends NMClient {
/** */
- protected NMMock() {
+ private List<ContainerLaunchContext> startedContainer = new ArrayList<>();
+
+ /** */
+ public NMMock() {
super("name");
}
+ /**
+ * @return Started containers.
+ */
+ public List<ContainerLaunchContext> startedContainer() {
+ return startedContainer;
+ }
+
/** {@inheritDoc} */
@Override public Map<String, ByteBuffer> startContainer(Container container,
ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
+
+ startedContainer.add(containerLaunchContext);
+
return null;
}
@@ -321,4 +459,74 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
return 0;
}
}
+
+ /**
+ * Mock file system.
+ */
+ public static class MockFileSystem extends FileSystem {
+ /** */
+ public MockFileSystem() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path makeQualified(Path path) {
+ return new Path("/test/path");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ return new FileStatus();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path new_dir) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return new FileStatus[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return null;
+ }
+ }
}
[5/8] incubator-ignite git commit: #YARN WIP
Posted by sb...@apache.org.
#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/12d9c02c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/12d9c02c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/12d9c02c
Branch: refs/heads/yarn
Commit: 12d9c02cf6443551cf903e6caac455201cfc0043
Parents: 85f4a89
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Fri Jun 5 19:37:23 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Fri Jun 5 19:37:23 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 35 ++++++++++++--------
1 file changed, 22 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12d9c02c/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 95197b7..fe065a3 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -19,6 +19,7 @@ package org.apache.ignite.yarn;
import org.apache.commons.io.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.client.api.async.*;
@@ -198,29 +199,37 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
- // Resource requirements for worker containers
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(1024);
- capability.setVirtualCores(2);
+ // Check ignite cluster.
+ while (!nmClient.isInState(Service.STATE.STOPPED)) {
+ Resource availableRes = rmClient.getAvailableResources();
- // Make container requests to ResourceManager
- for (int i = 0; i < 1; ++i) {
- AMRMClient.ContainerRequest containerAsk =
- new AMRMClient.ContainerRequest(capability, null, null, priority);
+ if (containers.size() < props.instances() || availableRes.getMemory() >= props.cpusPerNode()
+ || availableRes.getVirtualCores() >= props.cpus()) {
+ // Resource requirements for worker containers
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(1024);
+ capability.setVirtualCores(2);
- System.out.println("[AM] Making res-req " + i);
+ for (int i = 0; i < 1; ++i) {
+ // Make container requests to ResourceManager
+ AMRMClient.ContainerRequest containerAsk =
+ new AMRMClient.ContainerRequest(capability, null, null, priority);
- rmClient.addContainerRequest(containerAsk);
+ System.out.println("[AM] Making res-req " + i);
+
+ rmClient.addContainerRequest(containerAsk);
+ }
+ }
+
+ TimeUnit.SECONDS.sleep(5);
}
System.out.println("[AM] waiting for containers to finish");
- TimeUnit.MINUTES.sleep(10);
-
System.out.println("[AM] unregisterApplicationMaster 0");
// Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
System.out.println("[AM] unregisterApplicationMaster 1");
}
[3/8] incubator-ignite git commit: #IGNITE-857 WIP
Posted by sb...@apache.org.
#IGNITE-857 WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/81cde9b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/81cde9b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/81cde9b7
Branch: refs/heads/yarn
Commit: 81cde9b71b3ab88023f68fa6f3bee86f20442412
Parents: b569191
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Wed Jun 3 18:37:02 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Wed Jun 3 18:37:02 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 51 +++++++++++++++-----
.../apache/ignite/yarn/IgniteYarnClient.java | 15 ++----
2 files changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81cde9b7/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 9ab70d4..532830c 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -19,6 +19,9 @@ package org.apache.ignite.yarn;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.*;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.*;
@@ -33,27 +36,33 @@ import java.util.*;
* TODO
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
- Configuration configuration;
+ YarnConfiguration configuration;
NMClient nmClient;
- int numContainersToWaitFor = 5;
+ int numContainersToWaitFor = 1;
public ApplicationMaster() {
configuration = new YarnConfiguration();
+
nmClient = NMClient.createNMClient();
nmClient.init(configuration);
nmClient.start();
}
+ /** {@inheritDoc} */
public void onContainersAllocated(List<Container> containers) {
for (Container container : containers) {
try {
// Launch container by create ContainerLaunchContext
- // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov
- ContainerLaunchContext ctx =
- Records.newRecord(ContainerLaunchContext.class);
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ final LocalResource igniteZip = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip,
+ configuration);
+
+ ctx.setLocalResources(Collections.singletonMap("ignite", igniteZip));
ctx.setCommands(
Lists.newArrayList(
- "ls " +
+ "$LOCAL_DIRS/ignite/*/bin/ignite.sh" +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
));
@@ -65,6 +74,24 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
}
+ /** {@inheritDoc} */
+ private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
+ throws Exception {
+ FileSystem fileSystem = FileSystem.get(conf);
+ jarPath = fileSystem.makeQualified(jarPath);
+
+ FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+
+ appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ appMasterJar.setSize(jarStat.getLen());
+ appMasterJar.setTimestamp(jarStat.getModificationTime());
+ appMasterJar.setType(LocalResourceType.ARCHIVE);
+ appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ System.out.println("Path :" + jarPath);
+ }
+
+ /** {@inheritDoc} */
public void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
System.out.println("[AM] Completed container " + status.getContainerId());
@@ -74,20 +101,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
}
+ /** {@inheritDoc} */
public void onNodesUpdated(List<NodeReport> updated) {
}
- public void onReboot() {
- }
-
+ /** {@inheritDoc} */
public void onShutdownRequest() {
}
+ /** {@inheritDoc} */
public void onError(Throwable t) {
}
+ /** {@inheritDoc} */
public float getProgress() {
- return 0;
+ return 50;
}
public boolean doneWithContainers() {
@@ -101,7 +129,6 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public static void main(String[] args) throws Exception {
ApplicationMaster master = new ApplicationMaster();
master.runMainLoop();
-
}
public void runMainLoop() throws Exception {
@@ -136,6 +163,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
Thread.sleep(100);
}
+
+
System.out.println("[AM] unregisterApplicationMaster 0");
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81cde9b7/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index e020ef4..092aaa9 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -74,20 +74,15 @@ public class IgniteYarnClient {
final LocalResource igniteZip = Records.newRecord(LocalResource.class);
setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
- FileSystem fileSystem = FileSystem.get(conf);
-
- Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh"));
-
- System.out.println("Path: " + path);
- System.out.println("Path URI: " + path.toUri().toString());
-
- amContainer.setLocalResources(new HashMap<String, LocalResource>(){{
+ amContainer.setLocalResources(new HashMap<String, LocalResource>() {{
put("ignite-yarn.jar", appMasterJar);
- put("ignite", igniteZip);
+ put("gridgain-community-fabric-1.0.6.zip", igniteZip);
}});
+
+
// Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<String, String>();
+ Map<String, String> appMasterEnv = new HashMap<>();
setupAppMasterEnv(appMasterEnv, conf);
amContainer.setEnvironment(appMasterEnv);
[6/8] incubator-ignite git commit: #YARN WIP
Posted by sb...@apache.org.
#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/858d2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/858d2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/858d2a3f
Branch: refs/heads/yarn
Commit: 858d2a3f757fea2b88ffcb907e0f221699e32420
Parents: 12d9c02
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Mon Jun 8 20:03:31 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Mon Jun 8 20:03:31 2015 +0300
----------------------------------------------------------------------
modules/yarn/README.txt | 8 +-
.../apache/ignite/yarn/ApplicationMaster.java | 187 +++++++++++++------
.../apache/ignite/yarn/ClusterProperties.java | 145 ++++----------
.../org/apache/ignite/yarn/IgniteContainer.java | 33 +++-
.../org/apache/ignite/yarn/IgniteProvider.java | 4 +-
.../apache/ignite/yarn/IgniteYarnClient.java | 13 +-
.../org/apache/ignite/yarn/package-info.java | 2 +-
.../ignite/yarn/utils/IgniteYarnUtils.java | 4 +-
8 files changed, 207 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/README.txt
----------------------------------------------------------------------
diff --git a/modules/yarn/README.txt b/modules/yarn/README.txt
index 75a62f8..5cdd4a2 100644
--- a/modules/yarn/README.txt
+++ b/modules/yarn/README.txt
@@ -1,9 +1,9 @@
-Apache Ignite Mesos Module
+Apache Ignite Yarn Module
------------------------
-Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos.
+Apache Ignite Yarn module provides integration Apache Ignite with Apache Hadoop Yarn.
-Importing Apache Ignite Mesos Module In Maven Project
+Importing Apache Ignite Yarn Module In Maven Project
-------------------------------------
If you are using Maven to manage dependencies of your project, you can add Cloud module
@@ -19,7 +19,7 @@ interested in):
...
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-mesos</artifactId>
+ <artifactId>ignite-yarn</artifactId>
<version>${ignite.version}</version>
</dependency>
...
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index fe065a3..3bf0521 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -19,7 +19,7 @@ package org.apache.ignite.yarn;
import org.apache.commons.io.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.client.api.async.*;
@@ -30,11 +30,16 @@ import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
- * TODO
+ * Application master request containers from Yarn and decides how many resources will be occupied.
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
+ /** */
+ public static final Logger log = Logger.getLogger(ApplicationMaster.class.getSimpleName());
+
/** Default port range. */
public static final String DEFAULT_PORT = ":47500..47510";
@@ -51,6 +56,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
private NMClient nmClient;
/** */
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
+
+ /** */
private Path ignitePath;
/** */
@@ -60,7 +68,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
private FileSystem fs;
/** */
- private Map<String, IgniteContainer> containers = new HashMap<>();
+ private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
/**
* Constructor.
@@ -79,47 +87,81 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public synchronized void onContainersAllocated(List<Container> conts) {
- for (Container container : conts) {
- try {
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ for (Container c : conts) {
+ if (checkContainer(c)) {
+ try {
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- Map<String, String> env = new HashMap<>(System.getenv());
+ Map<String, String> env = new HashMap<>(System.getenv());
- env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(container.getNodeId().getHost()));
+ //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
- ctx.setEnvironment(env);
+ ctx.setEnvironment(env);
- Map<String, LocalResource> resources = new HashMap<>();
+ Map<String, LocalResource> resources = new HashMap<>();
- resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
- resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+ resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
+ resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
- ctx.setLocalResources(resources);
+ ctx.setLocalResources(resources);
- ctx.setCommands(
- Collections.singletonList(
- "./ignite/*/bin/ignite.sh "
- + "./ignite-config.xml"
- + " -J-Xmx" + container.getResource().getMemory() + "m"
- + " -J-Xms" + container.getResource().getMemory() + "m"
- + IgniteYarnUtils.YARN_LOG_OUT
- ));
+ ctx.setCommands(
+ Collections.singletonList(
+ "./ignite/*/bin/ignite.sh "
+ + "./ignite-config.xml"
+ + " -J-Xmx" + c.getResource().getMemory() + "m"
+ + " -J-Xms" + c.getResource().getMemory() + "m"
+ + IgniteYarnUtils.YARN_LOG_OUT
+ ));
- System.out.println("[AM] Launching container " + container.getId());
+ log.log(Level.INFO, "Launching container: {0}.", c.getId());
- nmClient.startContainer(container, ctx);
+ nmClient.startContainer(c, ctx);
- containers.put(container.getNodeId().getHost(),
- new IgniteContainer(container.getNodeId().getHost(), container.getResource().getVirtualCores(),
- container.getResource().getMemory()));
- }
- catch (Exception ex) {
- System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
+ containers.put(c.getId(),
+ new IgniteContainer(
+ c.getId(),
+ c.getNodeId(),
+ c.getResource().getVirtualCores(),
+ c.getResource().getMemory()));
+ }
+ catch (Exception ex) {
+ System.err.println("[AM] Error launching container " + c.getId() + " " + ex);
+ }
}
+ else
+ rmClient.releaseAssignedContainer(c.getId());
}
}
/**
+ * Checks that container
+ *
+ * @param cont Container.
+ * @return {@code True} if
+ */
+ private boolean checkContainer(Container cont) {
+ // Check limit on running nodes.
+ if (props.instances() <= containers.size())
+ return false;
+
+ // Check host name
+ if (props.hostnameConstraint() != null
+ && props.hostnameConstraint().matcher(cont.getNodeId().getHost()).matches())
+ return false;
+
+ // Check that slave satisfies min requirements.
+ if (cont.getResource().getVirtualCores() < props.cpusPerNode()
+ || cont.getResource().getMemory() < props.memoryPerNode()) {
+ //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @return Address running nodes.
*/
private String getAddress(String address) {
@@ -133,7 +175,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
StringBuilder sb = new StringBuilder();
for (IgniteContainer cont : containers.values())
- sb.append(cont.host()).append(DEFAULT_PORT).append(DELIM);
+ sb.append(cont.nodeId.getHost()).append(DEFAULT_PORT).append(DELIM);
return sb.substring(0, sb.length() - 1);
}
@@ -141,13 +183,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public synchronized void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
- synchronized (this) {
- }
+ containers.remove(status.getContainerId());
+
+ //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
}
}
/** {@inheritDoc} */
- public void onNodesUpdated(List<NodeReport> updated) {
+ public synchronized void onNodesUpdated(List<NodeReport> updated) {
+ for (NodeReport node : updated) {
+ // If node unusable.
+ if (node.getNodeState().isUnusable()) {
+ for (IgniteContainer cont : containers.values()) {
+ if (cont.nodeId().equals(node.getNodeId())) {
+ containers.remove(cont.id());
+
+ log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
+ new Object[]{node.getNodeId().getHost(), node.getNodeState()});
+ }
+ }
+
+ log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.",
+ new Object[]{node.getNodeId().getHost(), node.getNodeState()});
+ }
+ }
}
/** {@inheritDoc} */
@@ -169,7 +228,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
- ClusterProperties props = ClusterProperties.from(null);
+ ClusterProperties props = ClusterProperties.from();
ApplicationMaster master = new ApplicationMaster(args[0], props);
@@ -184,60 +243,72 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public void run() throws Exception {
- // Create asyn application master.
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
-
rmClient.init(conf);
rmClient.start();
// Register with ResourceManager
rmClient.registerApplicationMaster("", 0, "");
- System.out.println("[AM] registerApplicationMaster 1");
+ log.log(Level.INFO, "Application master registered.");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
- // Check ignite cluster.
- while (!nmClient.isInState(Service.STATE.STOPPED)) {
- Resource availableRes = rmClient.getAvailableResources();
+ try {
+ // Check ignite cluster.
+ while (!nmClient.isInState(Service.STATE.STOPPED)) {
+ int runningCnt = containers.size();
- if (containers.size() < props.instances() || availableRes.getMemory() >= props.cpusPerNode()
- || availableRes.getVirtualCores() >= props.cpus()) {
- // Resource requirements for worker containers
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(1024);
- capability.setVirtualCores(2);
+ if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) {
+ // Resource requirements for worker containers.
+ Resource capability = Records.newRecord(Resource.class);
- for (int i = 0; i < 1; ++i) {
- // Make container requests to ResourceManager
- AMRMClient.ContainerRequest containerAsk =
+ capability.setMemory((int)props.memoryPerNode());
+ capability.setVirtualCores((int)props.cpusPerNode());
+
+ for (int i = 0; i < props.instances() - runningCnt; ++i) {
+ // Make container requests to ResourceManager
+ AMRMClient.ContainerRequest containerAsk =
new AMRMClient.ContainerRequest(capability, null, null, priority);
- System.out.println("[AM] Making res-req " + i);
+ rmClient.addContainerRequest(containerAsk);
- rmClient.addContainerRequest(containerAsk);
+ log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.",
+ new Object[]{props.memoryPerNode(), props.cpusPerNode()});
+ }
}
- }
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.SECONDS.sleep(5);
+ }
}
+ catch (Exception e) {
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
- System.out.println("[AM] waiting for containers to finish");
-
- System.out.println("[AM] unregisterApplicationMaster 0");
+ System.exit(1);
+ }
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
+ }
- System.out.println("[AM] unregisterApplicationMaster 1");
+ /**
+ * @param availableRes Available resources.
+ * @return {@code True} if cluster contains available resources.
+ */
+ private boolean checkAvailableResource(Resource availableRes) {
+ return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
+ && availableRes.getVirtualCores() >= props.cpusPerNode();
}
/**
* @throws IOException
*/
public void init() throws IOException {
+ // Create async application master.
+ rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
+
if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
InputStream input = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index adddd51..f9fdb59 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -41,28 +41,16 @@ public class ClusterProperties {
/** */
public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster";
- /** Mesos master url. */
+ /** Cluster name. */
private String clusterName = DEFAULT_CLUSTER_NAME;
/** */
- public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
-
- /** CPU limit. */
- private double cpu = UNLIMITED;
-
- /** */
public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE";
/** CPU limit. */
private double cpuPerNode = UNLIMITED;
/** */
- public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY";
-
- /** Memory limit. */
- private double mem = UNLIMITED;
-
- /** */
public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE";
/** Memory limit. */
@@ -72,25 +60,7 @@ public class ClusterProperties {
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
/** Node count limit. */
- private double nodeCnt = UNLIMITED;
-
- /** */
- public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE";
-
- /** */
- public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
-
- /** Min memory per node. */
- private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
-
- /** */
- public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE";
-
- /** */
- public static final double DEFAULT_RESOURCE_MIN_MEM = 256;
-
- /** Min memory per node. */
- private double minMemory = DEFAULT_RESOURCE_MIN_MEM;
+ private double nodeCnt = 3;
/** */
public static final String IGNITE_VERSION = "IGNITE_VERSION";
@@ -170,19 +140,6 @@ public class ClusterProperties {
return clusterName;
}
- /**
- * @return CPU count limit.
- */
- public double cpus() {
- return cpu;
- }
-
- /**
- * Sets CPU count limit.
- */
- public void cpus(double cpu) {
- this.cpu = cpu;
- }
/**
* @return CPU count limit.
@@ -201,22 +158,6 @@ public class ClusterProperties {
/**
* @return mem limit.
*/
- public double memory() {
- return mem;
- }
-
- /**
- * Sets mem limit.
- *
- * @param mem Memory.
- */
- public void memory(double mem) {
- this.mem = mem;
- }
-
- /**
- * @return mem limit.
- */
public double memoryPerNode() {
return memPerNode;
}
@@ -238,22 +179,6 @@ public class ClusterProperties {
}
/**
- * @return min memory per node.
- */
- public double minMemoryPerNode() {
- return minMemory;
- }
-
- /**
- * Sets min memory.
- *
- * @param minMemory Min memory.
- */
- public void minMemoryPerNode(double minMemory) {
- this.minMemory = minMemory;
- }
-
- /**
* Sets hostname constraint.
*
* @param pattern Hostname pattern.
@@ -263,22 +188,6 @@ public class ClusterProperties {
}
/**
- * @return min cpu count per node.
- */
- public double minCpuPerNode() {
- return minCpu;
- }
-
- /**
- * Sets min cpu count per node.
- *
- * @param minCpu min cpu count per node.
- */
- public void minCpuPerNode(double minCpu) {
- this.minCpu = minCpu;
- }
-
- /**
* @return Ignite version.
*/
public String igniteVer() {
@@ -362,13 +271,9 @@ public class ClusterProperties {
prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
- prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
- prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
- prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
- prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
- prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
- prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
- prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
@@ -394,6 +299,40 @@ public class ClusterProperties {
}
/**
+ * @return Cluster configuration.
+ */
+ public static ClusterProperties from() {
+ ClusterProperties prop = new ClusterProperties();
+
+ prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
+
+ prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null);
+ prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null);
+
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0);
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0);
+
+ prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
+ prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
+
+ String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, null, null);
+
+ if (pattern != null) {
+ try {
+ prop.hostnameConstraint = Pattern.compile(pattern);
+ }
+ catch (PatternSyntaxException e) {
+ log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+ }
+ }
+
+ return prop;
+ }
+
+ /**
* Convert to properties to map.
*
* @return Key-value map.
@@ -406,13 +345,9 @@ public class ClusterProperties {
envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
- envs.put(IGNITE_TOTAL_CPU, toEnvVal(cpu));
envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
- envs.put(IGNITE_TOTAL_MEMORY, toEnvVal(mem));
envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
- envs.put(IGNITE_MIN_CPU_PER_NODE, toEnvVal(minCpu));
- envs.put(IGNITE_MIN_MEMORY_PER_NODE, toEnvVal(minMemory));
envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
@@ -461,7 +396,7 @@ public class ClusterProperties {
/**
* @param val Value.
- * @return If val is null {@link EMPTY_STRING} else to string.
+ * @return If val is null {@code EMPTY_STRING} else to string.
*/
private String toEnvVal(Object val) {
return val == null ? EMPTY_STRING : val.toString();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
index 4e3c285..a8b0342 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
@@ -17,12 +17,17 @@
package org.apache.ignite.yarn;
+import org.apache.hadoop.yarn.api.records.*;
+
/**
* Information about launched task.
*/
public class IgniteContainer {
/** */
- public final String host;
+ public final ContainerId id;
+
+ /** */
+ public final NodeId nodeId;
/** */
public final double cpuCores;
@@ -33,21 +38,29 @@ public class IgniteContainer {
/**
* Ignite launched task.
*
- * @param host Host.
+ * @param nodeId Node id.
* @param cpuCores Cpu cores count.
* @param mem Memory
*/
- public IgniteContainer(String host, double cpuCores, double mem) {
- this.host = host;
+ public IgniteContainer(ContainerId id, NodeId nodeId, double cpuCores, double mem) {
+ this.id = id;
+ this.nodeId = nodeId;
this.cpuCores = cpuCores;
this.mem = mem;
}
/**
+ * @return Id.
+ */
+ public ContainerId id() {
+ return id;
+ }
+
+ /**
* @return Host.
*/
- public String host() {
- return host;
+ public NodeId nodeId() {
+ return nodeId;
}
/**
@@ -64,10 +77,12 @@ public class IgniteContainer {
return mem;
}
- @Override
- public String toString() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override public String toString() {
return "IgniteTask " +
- "host: [" + host + ']' +
+ "host: [" + nodeId.getHost() + ']' +
", cpuCores: [" + cpuCores + "]" +
", mem: [" + mem + "]";
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
index c6e07cb..1ac2974 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
@@ -18,7 +18,7 @@
package org.apache.ignite.yarn;
import org.apache.hadoop.fs.*;
-import org.apache.ignite.yarn.utils.IgniteYarnUtils;
+import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.net.*;
@@ -26,7 +26,7 @@ import java.nio.channels.*;
import java.util.*;
/**
- * Class downloads and stores Ignite.
+ * Downloads and stores Ignite.
*/
public class IgniteProvider {
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 0ab9e91..f74890d 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -26,6 +26,7 @@ import org.apache.ignite.yarn.utils.*;
import java.io.*;
import java.util.*;
+import java.util.concurrent.*;
import java.util.logging.*;
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
@@ -69,9 +70,6 @@ public class IgniteYarnClient {
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- System.out.println(Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
- + IgniteYarnUtils.SPACE + ignite.toUri());
-
amContainer.setCommands(
Collections.singletonList(
Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
@@ -106,16 +104,18 @@ public class IgniteYarnClient {
// Submit application
ApplicationId appId = appContext.getApplicationId();
- System.out.println("Submitting application " + appId);
+
yarnClient.submitApplication(appContext);
+ log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId);
+
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
- Thread.sleep(100);
+ TimeUnit.SECONDS.sleep(1L);
appReport = yarnClient.getApplicationReport(appId);
@@ -124,8 +124,7 @@ public class IgniteYarnClient {
yarnClient.killApplication(appId);
- System.out.println("Application " + appId + " finished with state " + appState + " at "
- + appReport.getFinishTime());
+ log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState});
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
index c47f1e8..6734307 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java
@@ -17,6 +17,6 @@
/**
* <!-- Package description. -->
- * Contains classes to support integration with Apache Mesos.
+ * Contains classes to support integration with Apache Hadoop Yarn.
*/
package org.apache.ignite.yarn;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
index 1e6c414..3b62411 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.util.*;
-import java.io.IOException;
-
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
/**
- *
+ * Utils.
*/
public class IgniteYarnUtils {
/** */
[7/8] incubator-ignite git commit: #YARN Code cleanup. Added tests.
Posted by sb...@apache.org.
#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960e19dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960e19dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960e19dd
Branch: refs/heads/yarn
Commit: 960e19dda15d58ddc403a8e6856d0eb19d7794c1
Parents: 858d2a3
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue Jun 9 16:38:07 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue Jun 9 16:38:07 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 89 +++--
.../apache/ignite/yarn/ClusterProperties.java | 53 +--
.../apache/ignite/yarn/IgniteYarnClient.java | 30 +-
.../org/apache/ignite/IgniteMesosTestSuite.java | 38 ---
.../org/apache/ignite/IgniteYarnTestSuite.java | 38 +++
.../yarn/IgniteApplicationMasterSelfTest.java | 324 +++++++++++++++++++
.../ignite/yarn/IgniteSchedulerSelfTest.java | 29 --
7 files changed, 460 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 3bf0521..c552ea0 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -47,27 +47,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public static final String DELIM = ",";
/** */
+ private long schedulerTimeout = TimeUnit.SECONDS.toMillis(1);
+
+ /** Yarn configuration. */
private YarnConfiguration conf;
- /** */
+ /** Cluster properties. */
private ClusterProperties props;
- /** */
+ /** Network manager. */
private NMClient nmClient;
- /** */
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
+ /** Resource manager. */
+ private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
- /** */
+ /** Ignite path. */
private Path ignitePath;
- /** */
+ /** Config path. */
private Path cfgPath;
- /** */
+ /** Hadoop file system. */
private FileSystem fs;
- /** */
+ /** Running containers. */
private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
/**
@@ -76,13 +79,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception {
this.conf = new YarnConfiguration();
this.props = props;
- this.fs = FileSystem.get(conf);
this.ignitePath = new Path(ignitePath);
-
- nmClient = NMClient.createNMClient();
-
- nmClient.init(conf);
- nmClient.start();
}
/** {@inheritDoc} */
@@ -103,11 +100,16 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+ if (props.userLibs() != null)
+ resources.put("libs", IgniteYarnUtils.setupFile(new Path(props.userLibs()), fs,
+ LocalResourceType.FILE));
+
ctx.setLocalResources(resources);
ctx.setCommands(
Collections.singletonList(
- "./ignite/*/bin/ignite.sh "
+ "cp -r ./libs/* ./ignite/*/libs/ || true && "
+ + "./ignite/*/bin/ignite.sh "
+ "./ignite-config.xml"
+ " -J-Xmx" + c.getResource().getMemory() + "m"
+ " -J-Xms" + c.getResource().getMemory() + "m"
@@ -153,7 +155,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Check that slave satisfies min requirements.
if (cont.getResource().getVirtualCores() < props.cpusPerNode()
|| cont.getResource().getMemory() < props.memoryPerNode()) {
- //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+ log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
+ new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
+ cont.getResource().getMemory()});
return false;
}
@@ -185,7 +189,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
for (ContainerStatus status : statuses) {
containers.remove(status.getContainerId());
- //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+ log.log(Level.INFO, "Container stopped. Container id: {0}. State: {1}.",
+ new Object[]{status.getContainerId(), status.getState()});
}
}
@@ -243,9 +248,6 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public void run() throws Exception {
- rmClient.init(conf);
- rmClient.start();
-
// Register with ResourceManager
rmClient.registerApplicationMaster("", 0, "");
@@ -260,7 +262,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
while (!nmClient.isInState(Service.STATE.STOPPED)) {
int runningCnt = containers.size();
- if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) {
+ if (runningCnt < props.instances() && checkAvailableResource()) {
// Resource requirements for worker containers.
Resource capability = Records.newRecord(Resource.class);
@@ -279,7 +281,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
}
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
}
}
catch (Exception e) {
@@ -294,10 +296,11 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
/**
- * @param availableRes Available resources.
* @return {@code True} if cluster contains available resources.
*/
- private boolean checkAvailableResource(Resource availableRes) {
+ private boolean checkAvailableResource() {
+ Resource availableRes = rmClient.getAvailableResources();
+
return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
&& availableRes.getVirtualCores() >= props.cpusPerNode();
}
@@ -306,10 +309,17 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws IOException
*/
public void init() throws IOException {
+ fs = FileSystem.get(conf);
+
+ nmClient = NMClient.createNMClient();
+
+ nmClient.init(conf);
+ nmClient.start();
+
// Create async application master.
rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
- if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
+ if (props.igniteCfg() == null || props.igniteCfg().isEmpty()) {
InputStream input = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
@@ -325,6 +335,33 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
IOUtils.closeQuietly(outputStream);
}
else
- cfgPath = new Path(props.igniteConfigUrl());
+ cfgPath = new Path(props.igniteCfg());
+ }
+
+ /**
+ * Sets NMClient.
+ *
+ * @param nmClient NMClient.
+ */
+ public void setNmClient(NMClient nmClient) {
+ this.nmClient = nmClient;
+ }
+
+ /**
+ * Sets RMClient
+ *
+ * @param rmClient AMRMClientAsync.
+ */
+ public void setRmClient(AMRMClientAsync<AMRMClient.ContainerRequest> rmClient) {
+ this.rmClient = rmClient;
+ }
+
+ /**
+ * Sets scheduler timeout.
+ *
+ * @param schedulerTimeout Scheduler timeout.
+ */
+ public void setSchedulerTimeout(long schedulerTimeout) {
+ this.schedulerTimeout = schedulerTimeout;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index f9fdb59..d021d45 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -59,8 +59,11 @@ public class ClusterProperties {
/** */
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
+ /** */
+ public static final int DEFAULT_IGNITE_NODE_COUNT = 3;
+
/** Node count limit. */
- private double nodeCnt = 3;
+ private double nodeCnt = DEFAULT_IGNITE_NODE_COUNT;
/** */
public static final String IGNITE_VERSION = "IGNITE_VERSION";
@@ -105,24 +108,12 @@ public class ClusterProperties {
private String userLibs = null;
/** */
- public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL";
-
- /** URL to users libs. */
- private String userLibsUrl = null;
-
- /** */
public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG";
/** Ignite config. */
private String igniteCfg = null;
/** */
- public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL";
-
- /** Url to ignite config. */
- private String igniteCfgUrl = null;
-
- /** */
public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT";
/** Url to ignite config. */
@@ -179,6 +170,13 @@ public class ClusterProperties {
}
/**
+ * Sets instance count limit.
+ */
+ public void instances(int nodeCnt) {
+ this.nodeCnt = nodeCnt;
+ }
+
+ /**
* Sets hostname constraint.
*
* @param pattern Hostname pattern.
@@ -230,20 +228,6 @@ public class ClusterProperties {
}
/**
- * @return Url to ignite configuration.
- */
- public String igniteConfigUrl() {
- return igniteCfgUrl;
- }
-
- /**
- * @return Url to users libs configuration.
- */
- public String usersLibsUrl() {
- return userLibsUrl;
- }
-
- /**
* @return Host name constraint.
*/
public Pattern hostnameConstraint() {
@@ -268,15 +252,14 @@ public class ClusterProperties {
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
- prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
- prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
-
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
@@ -306,15 +289,14 @@ public class ClusterProperties {
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
- prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null);
- prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null);
-
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
@@ -342,15 +324,14 @@ public class ClusterProperties {
envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName));
- envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
- envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
-
envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
+ envs.put(IGNITE_LOCAL_WORK_DIR, toEnvVal(igniteLocalWorkDir));
+ envs.put(IGNITE_RELEASES_DIR, toEnvVal(igniteReleasesDir));
envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg));
envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index f74890d..764e717 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -39,9 +39,9 @@ public class IgniteYarnClient {
public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
/**
- * Main methods has only one optional parameter - path to properties file.
+ * Main methods has one mandatory parameter and one optional parameter.
*
- * @param args Args.
+ * @param args Path to jar mandatory parameter and property file is optional.
*/
public static void main(String[] args) throws Exception {
checkArguments(args);
@@ -107,24 +107,27 @@ public class IgniteYarnClient {
yarnClient.submitApplication(appContext);
- log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId);
+ log.log(Level.INFO, "Submitted application. Application id: {0}", appId);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
- while (appState != YarnApplicationState.FINISHED &&
- appState != YarnApplicationState.KILLED &&
- appState != YarnApplicationState.FAILED) {
+ while (appState == YarnApplicationState.NEW ||
+ appState == YarnApplicationState.NEW_SAVING ||
+ appState == YarnApplicationState.SUBMITTED ||
+ appState == YarnApplicationState.ACCEPTED) {
TimeUnit.SECONDS.sleep(1L);
appReport = yarnClient.getApplicationReport(appId);
+ if (appState != YarnApplicationState.ACCEPTED
+ && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
+ log.log(Level.INFO, "Application {0} is ACCEPTED.", appId);
+
appState = appReport.getYarnApplicationState();
}
- yarnClient.killApplication(appId);
-
- log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState});
+ log.log(Level.INFO, "Application {0} is {1}.", new Object[]{appId, appState});
}
/**
@@ -134,7 +137,7 @@ public class IgniteYarnClient {
*/
private static void checkArguments(String[] args) {
if (args.length < 1)
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Invalid arguments.");
}
/**
@@ -146,11 +149,14 @@ public class IgniteYarnClient {
private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws Exception {
IgniteProvider provider = new IgniteProvider(props, fileSystem);
- return provider.getIgnite();
+ if (props.igniteVer() == null
+ || props.igniteVer().equalsIgnoreCase(ClusterProperties.DEFAULT_IGNITE_VERSION))
+ return provider.getIgnite();
+ else
+ return provider.getIgnite(props.igniteVer());
}
/**
- *
* @param envs Environment variables.
* @param conf Yarn configuration.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
deleted file mode 100644
index e6920b3..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import junit.framework.*;
-import org.apache.ignite.yarn.*;
-
-/**
- * Apache Mesos integration tests.
- */
-public class IgniteMesosTestSuite extends TestSuite {
- /**
- * @return Test suite.
- * @throws Exception Thrown in case of the failure.
- */
- public static TestSuite suite() throws Exception {
- TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite");
-
- suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class));
-
- return suite;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
new file mode 100644
index 0000000..aa31774
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import junit.framework.*;
+import org.apache.ignite.yarn.*;
+
+/**
+ * Apache Hadoop Yarn integration tests.
+ */
+public class IgniteYarnTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Yarn Integration Test Suite");
+
+ suite.addTest(new TestSuite(IgniteApplicationMasterSelfTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
new file mode 100644
index 0000000..d865659
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import junit.framework.*;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.*;
+import org.apache.hadoop.yarn.client.api.async.*;
+import org.apache.hadoop.yarn.exceptions.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Application master tests.
+ */
+public class IgniteApplicationMasterSelfTest extends TestCase {
+ /** */
+ private ApplicationMaster appMaster;
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private RMMock rmMock = new RMMock();
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+
+ props = new ClusterProperties();
+ appMaster = new ApplicationMaster("test", props);
+
+ appMaster.setSchedulerTimeout(100000);
+
+ rmMock.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testContainerAllocate() throws Exception {
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(2);
+ props.memoryPerNode(1024);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 2, 1000);
+
+ interruptedThread(thread);
+
+ assertEquals(3, contRequests.size());
+
+ for (AMRMClient.ContainerRequest req : contRequests) {
+ assertEquals(2, req.getCapability().getVirtualCores());
+ assertEquals(1024, req.getCapability().getMemory());
+ }
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClusterResource() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(10240);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ interruptedThread(thread);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+ assertEquals(0, contRequests.size());
+ }
+
+ /**
+ * @param rmMock RM mock.
+ * @param expectedCnt Expected cnt.
+ * @param timeOut Timeout.
+ * @return Requests.
+ */
+ private List<AMRMClient.ContainerRequest> collectRequests(RMMock rmMock, int expectedCnt, int timeOut) {
+ long startTime = System.currentTimeMillis();
+
+ List<AMRMClient.ContainerRequest> requests = rmMock.requests();
+
+ while (requests.size() < expectedCnt
+ && (System.currentTimeMillis() - startTime) < timeOut)
+ requests = rmMock.requests();
+
+ return requests;
+ }
+
+ /**
+ * Runs appMaster other thread.
+ *
+ * @param appMaster Application master.
+ * @return Thread.
+ */
+ private static Thread runAppMaster(final ApplicationMaster appMaster) {
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ appMaster.run();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ thread.start();
+
+ return thread;
+ }
+
+ /**
+ * Interrupt thread and wait.
+ *
+ * @param thread Thread.
+ */
+ private static void interruptedThread(Thread thread) throws InterruptedException {
+ thread.interrupt();
+
+ thread.join();
+ }
+
+ /**
+ * Resource manager mock.
+ */
+ private static class RMMock extends AMRMClientAsync {
+ /** */
+ private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
+
+ /** */
+ private Resource availableRes;
+
+ /** */
+ public RMMock() {
+ super(0, null);
+ }
+
+ /**
+ * @return Requests.
+ */
+ public List<AMRMClient.ContainerRequest> requests() {
+ return contRequests;
+ }
+
+ /**
+ * Sets resource.
+ *
+ * @param availableRes Available resource.
+ */
+ public void availableRes(Resource availableRes) {
+ this.availableRes = availableRes;
+ }
+
+ /**
+ * Clear internal state.
+ */
+ public void clear() {
+ contRequests.clear();
+ availableRes = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<? extends Collection> getMatchingRequests(Priority priority, String resourceName,
+ Resource capability) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName,
+ int appHostPort, String appTrackingUrl) throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage,
+ String appTrackingUrl) throws YarnException, IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ contRequests.add(req);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseAssignedContainer(ContainerId containerId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Resource getAvailableResources() {
+ return availableRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getClusterNodeCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Network manager mock.
+ */
+ public static class NMMock extends NMClient {
+ /** */
+ protected NMMock() {
+ super("name");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, ByteBuffer> startContainer(Container container,
+ ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupRunningContainersOnStop(boolean enabled) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Resource.
+ */
+ public static class MockResource extends Resource {
+ /** Memory. */
+ private int mem;
+
+ /** CPU. */
+ private int cpu;
+
+ /**
+ * @param mem Memory.
+ * @param cpu CPU.
+ */
+ public MockResource(int mem, int cpu) {
+ this.mem = mem;
+ this.cpu = cpu;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMemory() {
+ return mem;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMemory(int memory) {
+ this.mem = memory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getVirtualCores() {
+ return cpu;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setVirtualCores(int vCores) {
+ this.cpu = vCores;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Resource resource) {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
deleted file mode 100644
index 04d3492..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yarn;
-
-import junit.framework.*;
-
-/**
- * Scheduler tests.
- */
-public class IgniteSchedulerSelfTest extends TestCase {
- public void testName() throws Exception {
-
- }
-}
[4/8] incubator-ignite git commit: #YARN WIP
Posted by sb...@apache.org.
#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/85f4a891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/85f4a891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/85f4a891
Branch: refs/heads/yarn
Commit: 85f4a8915fec6e9332a492728c4a37e639916f3f
Parents: 81cde9b
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Fri Jun 5 18:32:28 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Fri Jun 5 18:32:28 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 204 +++++++----
.../apache/ignite/yarn/ClusterProperties.java | 196 ++++------
.../org/apache/ignite/yarn/IgniteContainer.java | 74 ++++
.../org/apache/ignite/yarn/IgniteProvider.java | 362 +++++++++++++++++++
.../java/org/apache/ignite/yarn/IgniteTask.java | 86 -----
.../apache/ignite/yarn/IgniteYarnClient.java | 128 ++++---
.../ignite/yarn/utils/IgniteYarnUtils.java | 83 +++++
.../main/resources/ignite-default-config.xml | 33 ++
8 files changed, 838 insertions(+), 328 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 532830c..95197b7 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -17,86 +17,130 @@
package org.apache.ignite.yarn;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.*;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.commons.io.*;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.client.api.async.*;
import org.apache.hadoop.yarn.conf.*;
import org.apache.hadoop.yarn.util.*;
+import org.apache.ignite.yarn.utils.*;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* TODO
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
- YarnConfiguration configuration;
- NMClient nmClient;
- int numContainersToWaitFor = 1;
+ /** Default port range. */
+ public static final String DEFAULT_PORT = ":47500..47510";
- public ApplicationMaster() {
- configuration = new YarnConfiguration();
+ /** Delimiter char. */
+ public static final String DELIM = ",";
+
+ /** */
+ private YarnConfiguration conf;
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private NMClient nmClient;
+
+ /** */
+ private Path ignitePath;
+
+ /** */
+ private Path cfgPath;
+
+ /** */
+ private FileSystem fs;
+
+ /** */
+ private Map<String, IgniteContainer> containers = new HashMap<>();
+
+ /**
+ * Constructor.
+ */
+ public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception {
+ this.conf = new YarnConfiguration();
+ this.props = props;
+ this.fs = FileSystem.get(conf);
+ this.ignitePath = new Path(ignitePath);
nmClient = NMClient.createNMClient();
- nmClient.init(configuration);
+
+ nmClient.init(conf);
nmClient.start();
}
/** {@inheritDoc} */
- public void onContainersAllocated(List<Container> containers) {
- for (Container container : containers) {
+ public synchronized void onContainersAllocated(List<Container> conts) {
+ for (Container container : conts) {
try {
- // Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- final LocalResource igniteZip = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip,
- configuration);
+ Map<String, String> env = new HashMap<>(System.getenv());
+
+ env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(container.getNodeId().getHost()));
+
+ ctx.setEnvironment(env);
+
+ Map<String, LocalResource> resources = new HashMap<>();
+
+ resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
+ resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+
+ ctx.setLocalResources(resources);
- ctx.setLocalResources(Collections.singletonMap("ignite", igniteZip));
ctx.setCommands(
- Lists.newArrayList(
- "$LOCAL_DIRS/ignite/*/bin/ignite.sh" +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
- ));
+ Collections.singletonList(
+ "./ignite/*/bin/ignite.sh "
+ + "./ignite-config.xml"
+ + " -J-Xmx" + container.getResource().getMemory() + "m"
+ + " -J-Xms" + container.getResource().getMemory() + "m"
+ + IgniteYarnUtils.YARN_LOG_OUT
+ ));
+
System.out.println("[AM] Launching container " + container.getId());
+
nmClient.startContainer(container, ctx);
- } catch (Exception ex) {
+
+ containers.put(container.getNodeId().getHost(),
+ new IgniteContainer(container.getNodeId().getHost(), container.getResource().getVirtualCores(),
+ container.getResource().getMemory()));
+ }
+ catch (Exception ex) {
System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
}
}
}
- /** {@inheritDoc} */
- private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
- throws Exception {
- FileSystem fileSystem = FileSystem.get(conf);
- jarPath = fileSystem.makeQualified(jarPath);
+ /**
+ * @return Address running nodes.
+ */
+ private String getAddress(String address) {
+ if (containers.isEmpty()) {
+ if (address != null && !address.isEmpty())
+ return address + DEFAULT_PORT;
+
+ return "";
+ }
- FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+ StringBuilder sb = new StringBuilder();
- appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- appMasterJar.setSize(jarStat.getLen());
- appMasterJar.setTimestamp(jarStat.getModificationTime());
- appMasterJar.setType(LocalResourceType.ARCHIVE);
- appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+ for (IgniteContainer cont : containers.values())
+ sb.append(cont.host()).append(DEFAULT_PORT).append(DELIM);
- System.out.println("Path :" + jarPath);
+ return sb.substring(0, sb.length() - 1);
}
/** {@inheritDoc} */
- public void onContainersCompleted(List<ContainerStatus> statuses) {
+ public synchronized void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
- System.out.println("[AM] Completed container " + status.getContainerId());
synchronized (this) {
- numContainersToWaitFor--;
}
}
}
@@ -111,6 +155,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
/** {@inheritDoc} */
public void onError(Throwable t) {
+ nmClient.stop();
}
/** {@inheritDoc} */
@@ -118,28 +163,35 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
return 50;
}
- public boolean doneWithContainers() {
- return numContainersToWaitFor == 0;
- }
+ /**
+ * @param args Args.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ ClusterProperties props = ClusterProperties.from(null);
- public Configuration getConfiguration() {
- return configuration;
- }
+ ApplicationMaster master = new ApplicationMaster(args[0], props);
- public static void main(String[] args) throws Exception {
- ApplicationMaster master = new ApplicationMaster();
- master.runMainLoop();
+ master.init();
+
+ master.run();
}
- public void runMainLoop() throws Exception {
+ /**
+ * Runs application master.
+ *
+ * @throws Exception If failed.
+ */
+ public void run() throws Exception {
+ // Create asyn application master.
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this);
- rmClient.init(getConfiguration());
+ rmClient.init(conf);
rmClient.start();
// Register with ResourceManager
- System.out.println("[AM] registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
+
System.out.println("[AM] registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
@@ -148,27 +200,51 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Resource requirements for worker containers
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(128);
- capability.setVirtualCores(1);
+ capability.setMemory(1024);
+ capability.setVirtualCores(2);
// Make container requests to ResourceManager
- for (int i = 0; i < numContainersToWaitFor; ++i) {
- AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
+ for (int i = 0; i < 1; ++i) {
+ AMRMClient.ContainerRequest containerAsk =
+ new AMRMClient.ContainerRequest(capability, null, null, priority);
+
System.out.println("[AM] Making res-req " + i);
+
rmClient.addContainerRequest(containerAsk);
}
System.out.println("[AM] waiting for containers to finish");
- while (!doneWithContainers()) {
- Thread.sleep(100);
- }
-
+ TimeUnit.MINUTES.sleep(10);
System.out.println("[AM] unregisterApplicationMaster 0");
+
// Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, "", "");
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
+
System.out.println("[AM] unregisterApplicationMaster 1");
}
+
+ /**
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
+ InputStream input = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
+
+ cfgPath = new Path(props.igniteWorkDir() + File.separator + IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
+
+ // Create file. Override by default.
+ FSDataOutputStream outputStream = fs.create(cfgPath, true);
+
+ IOUtils.copy(input, outputStream);
+
+ IOUtils.closeQuietly(input);
+
+ IOUtils.closeQuietly(outputStream);
+ }
+ else
+ cfgPath = new Path(props.igniteConfigUrl());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index 0c6c26d..adddd51 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -18,7 +18,6 @@
package org.apache.ignite.yarn;
import java.io.*;
-import java.net.*;
import java.util.*;
import java.util.logging.*;
import java.util.regex.*;
@@ -30,17 +29,11 @@ public class ClusterProperties {
/** */
private static final Logger log = Logger.getLogger(ClusterProperties.class.getSimpleName());
- /** Unlimited. */
- public static final double UNLIMITED = Double.MAX_VALUE;
-
/** */
- public static final String MESOS_MASTER_URL = "MESOS_MASTER_URL";
+ public static final String EMPTY_STRING = "";
- /** */
- public static final String DEFAULT_MESOS_MASTER_URL = "zk://localhost:2181/mesos";
-
- /** Mesos master url. */
- private String mesosUrl = DEFAULT_MESOS_MASTER_URL;
+ /** Unlimited. */
+ public static final double UNLIMITED = Double.MAX_VALUE;
/** */
public static final String IGNITE_CLUSTER_NAME = "IGNITE_CLUSTER_NAME";
@@ -52,21 +45,6 @@ public class ClusterProperties {
private String clusterName = DEFAULT_CLUSTER_NAME;
/** */
- public static final String IGNITE_HTTP_SERVER_HOST = "IGNITE_HTTP_SERVER_HOST";
-
- /** Http server host. */
- private String httpServerHost = null;
-
- /** */
- public static final String IGNITE_HTTP_SERVER_PORT = "IGNITE_HTTP_SERVER_PORT";
-
- /** */
- public static final String DEFAULT_HTTP_SERVER_PORT = "48610";
-
- /** Http server host. */
- private int httpServerPort = Integer.valueOf(DEFAULT_HTTP_SERVER_PORT);
-
- /** */
public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU";
/** CPU limit. */
@@ -91,18 +69,6 @@ public class ClusterProperties {
private double memPerNode = UNLIMITED;
/** */
- public static final String IGNITE_TOTAL_DISK_SPACE = "IGNITE_TOTAL_DISK_SPACE";
-
- /** Disk space limit. */
- private double disk = UNLIMITED;
-
- /** */
- public static final String IGNITE_DISK_SPACE_PER_NODE = "IGNITE_DISK_SPACE_PER_NODE";
-
- /** Disk space limit. */
- private double diskPerNode = UNLIMITED;
-
- /** */
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
/** Node count limit. */
@@ -136,19 +102,31 @@ public class ClusterProperties {
private String igniteVer = DEFAULT_IGNITE_VERSION;
/** */
- public static final String IGNITE_PACKAGE_URL = "IGNITE_PACKAGE_URL";
+ public static final String IGNITE_WORKING_DIR = "IGNITE_WORKING_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_WORK_DIR = "/ignite/workdir/";
- /** Ignite package url. */
- private String ignitePackageUrl = null;
+ /** Ignite work directory. */
+ private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
/** */
- public static final String IGNITE_WORK_DIR = "IGNITE_WORK_DIR";
+ public static final String IGNITE_LOCAL_WORK_DIR = "IGNITE_LOCAL_WORK_DIR";
/** */
- public static final String DEFAULT_IGNITE_WORK_DIR = "ignite-releases/";
+ public static final String DEFAULT_IGNITE_LOCAL_WORK_DIR = "./ignite-releases/";
- /** Ignite version. */
- private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR;
+ /** Ignite local work directory. */
+ private String igniteLocalWorkDir = DEFAULT_IGNITE_LOCAL_WORK_DIR;
+
+ /** */
+ public static final String IGNITE_RELEASES_DIR = "IGNITE_RELEASES_DIR";
+
+ /** */
+ public static final String DEFAULT_IGNITE_RELEASES_DIR = "/ignite/releases/";
+
+ /** Ignite local work directory. */
+ private String igniteReleasesDir = DEFAULT_IGNITE_RELEASES_DIR;
/** */
public static final String IGNITE_USERS_LIBS = "IGNITE_USERS_LIBS";
@@ -253,20 +231,6 @@ public class ClusterProperties {
}
/**
- * @return disk limit.
- */
- public double disk() {
- return disk;
- }
-
- /**
- * @return disk limit per node.
- */
- public double diskPerNode() {
- return diskPerNode;
- }
-
- /**
* @return instance count limit.
*/
public double instances() {
@@ -329,45 +293,31 @@ public class ClusterProperties {
}
/**
- * @return User's libs.
+ * @return Local working directory.
*/
- public String userLibs() {
- return userLibs;
+ public String igniteLocalWorkDir() {
+ return igniteLocalWorkDir;
}
/**
- * @return Ignite configuration.
+ * @return Ignite releases dir.
*/
- public String igniteCfg() {
- return igniteCfg;
+ public String igniteReleasesDir() {
+ return igniteReleasesDir;
}
/**
- * @return Master url.
- */
- public String masterUrl() {
- return mesosUrl;
- }
-
- /**
- * @return Http server host.
- */
- public String httpServerHost() {
- return httpServerHost;
- }
-
- /**
- * @return Http server port.
+ * @return User's libs.
*/
- public int httpServerPort() {
- return httpServerPort;
+ public String userLibs() {
+ return userLibs;
}
/**
- * @return Url to ignite package.
+ * @return Ignite configuration.
*/
- public String ignitePackageUrl() {
- return ignitePackageUrl;
+ public String igniteCfg() {
+ return igniteCfg;
}
/**
@@ -407,36 +357,21 @@ public class ClusterProperties {
ClusterProperties prop = new ClusterProperties();
- prop.mesosUrl = getStringProperty(MESOS_MASTER_URL, props, DEFAULT_MESOS_MASTER_URL);
-
- prop.httpServerHost = getStringProperty(IGNITE_HTTP_SERVER_HOST, props, getNonLoopbackAddress());
-
- String port = System.getProperty("PORT0");
-
- if (port != null && !port.isEmpty())
- prop.httpServerPort = Integer.valueOf(port);
- else
- prop.httpServerPort = Integer.valueOf(getStringProperty(IGNITE_HTTP_SERVER_PORT, props,
- DEFAULT_HTTP_SERVER_PORT));
-
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
- prop.ignitePackageUrl = getStringProperty(IGNITE_PACKAGE_URL, props, null);
prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED);
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED);
prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED);
- prop.disk = getDoubleProperty(IGNITE_TOTAL_DISK_SPACE, props, UNLIMITED);
- prop.diskPerNode = getDoubleProperty(IGNITE_DISK_SPACE_PER_NODE, props, 1024.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED);
prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU);
prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
- prop.igniteWorkDir = getStringProperty(IGNITE_WORK_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
@@ -459,6 +394,38 @@ public class ClusterProperties {
}
/**
+ * Convert to properties to map.
+ *
+ * @return Key-value map.
+ */
+ public Map<String, String> toEnvs() {
+ Map<String, String> envs = new HashMap<>();
+
+ envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName));
+
+ envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
+ envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
+
+ envs.put(IGNITE_TOTAL_CPU, toEnvVal(cpu));
+ envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
+ envs.put(IGNITE_TOTAL_MEMORY, toEnvVal(mem));
+ envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
+ envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
+ envs.put(IGNITE_MIN_CPU_PER_NODE, toEnvVal(minCpu));
+ envs.put(IGNITE_MIN_MEMORY_PER_NODE, toEnvVal(minMemory));
+
+ envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
+ envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
+ envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg));
+ envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs));
+
+ if (hostnameConstraint != null)
+ envs.put(IGNITE_HOSTNAME_CONSTRAINT, toEnvVal(hostnameConstraint.pattern()));
+
+ return envs;
+ }
+
+ /**
* @param name Property name.
* @param fileProps Property file.
* @return Property value.
@@ -472,7 +439,7 @@ public class ClusterProperties {
if (property == null)
property = System.getenv(name);
- return property == null ? defaultVal : Double.valueOf(property);
+ return property == null || property.isEmpty() ? defaultVal : Double.valueOf(property);
}
/**
@@ -489,31 +456,14 @@ public class ClusterProperties {
if (property == null)
property = System.getenv(name);
- return property == null ? defaultVal : property;
+ return property == null || property.isEmpty() ? defaultVal : property;
}
/**
- * Finds a local, non-loopback, IPv4 address
- *
- * @return The first non-loopback IPv4 address found, or <code>null</code> if no such addresses found
- * @throws SocketException If there was a problem querying the network interfaces
+ * @param val Value.
+ * @return If val is null {@link EMPTY_STRING} else to string.
*/
- public static String getNonLoopbackAddress() throws SocketException {
- Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
-
- while (ifaces.hasMoreElements()) {
- NetworkInterface iface = ifaces.nextElement();
-
- Enumeration<InetAddress> addresses = iface.getInetAddresses();
-
- while (addresses.hasMoreElements()) {
- InetAddress addr = addresses.nextElement();
-
- if (addr instanceof Inet4Address && !addr.isLoopbackAddress())
- return addr.getHostAddress();
- }
- }
-
- throw new RuntimeException("Failed. Couldn't find non-loopback address");
+ private String toEnvVal(Object val) {
+ return val == null ? EMPTY_STRING : val.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
new file mode 100644
index 0000000..4e3c285
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+/**
+ * Information about launched task.
+ */
+public class IgniteContainer {
+ /** */
+ public final String host;
+
+ /** */
+ public final double cpuCores;
+
+ /** */
+ public final double mem;
+
+ /**
+ * Ignite launched task.
+ *
+ * @param host Host.
+ * @param cpuCores Cpu cores count.
+ * @param mem Memory
+ */
+ public IgniteContainer(String host, double cpuCores, double mem) {
+ this.host = host;
+ this.cpuCores = cpuCores;
+ this.mem = mem;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Cores count.
+ */
+ public double cpuCores() {
+ return cpuCores;
+ }
+
+ /**
+ * @return Memory.
+ */
+ public double mem() {
+ return mem;
+ }
+
+ @Override
+ public String toString() {
+ return "IgniteTask " +
+ "host: [" + host + ']' +
+ ", cpuCores: [" + cpuCores + "]" +
+ ", mem: [" + mem + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
new file mode 100644
index 0000000..c6e07cb
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.yarn.utils.IgniteYarnUtils;
+
+import java.io.*;
+import java.net.*;
+import java.nio.channels.*;
+import java.util.*;
+
+/**
+ * Class downloads and stores Ignite.
+ */
+public class IgniteProvider {
+ /** */
+ public static final String DOWNLOAD_LINK = "http://tiny.cc/updater/download_community.php";
+
+ /** */
+ public static final String DIRECT_DOWNLOAD_LINK = "http://www.gridgain.com/media/gridgain-community-fabric-";
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private String latestVersion = null;
+
+ /** */
+ private boolean hdfs = false;
+
+ /** */
+ private FileSystem fs;
+
+ /**
+ * @param props Cluster properties.
+ * @param fs Hadoop file system.
+ */
+ public IgniteProvider(ClusterProperties props, FileSystem fs) {
+ this.props = props;
+ this.fs = fs;
+ }
+
+ /**
+ * @return Latest ignite version.
+ */
+ public Path getIgnite() throws Exception {
+ File folder = checkDownloadFolder();
+
+ if (latestVersion == null) {
+ List<String> localFiles = findIgnites(folder);
+ List<String> hdfsFiles = findIgnites(fs, props.igniteReleasesDir());
+
+ String localLatestVersion = findLatestVersion(localFiles);
+ String hdfsLatestVersion = findLatestVersion(hdfsFiles);
+
+ if (localLatestVersion != null && hdfsLatestVersion != null) {
+ if (VersionComparator.INSTANCE.compare(hdfsLatestVersion, localLatestVersion) >= 0) {
+ latestVersion = hdfsLatestVersion;
+
+ hdfs = true;
+ }
+ }
+ else if (localLatestVersion != null)
+ latestVersion = localLatestVersion;
+ else if (hdfsLatestVersion != null) {
+ latestVersion = hdfsLatestVersion;
+
+ hdfs = true;
+ }
+ }
+
+ String newVersion = updateIgnite(latestVersion);
+
+ if (latestVersion != null && newVersion.equals(latestVersion)) {
+ if (hdfs)
+ return new Path(formatPath(props.igniteReleasesDir(), latestVersion));
+ else {
+ return IgniteYarnUtils.copyLocalToHdfs(fs, formatPath(props.igniteLocalWorkDir(), latestVersion),
+ formatPath(props.igniteReleasesDir(), latestVersion));
+ }
+ }
+ else {
+ latestVersion = newVersion;
+
+ return IgniteYarnUtils.copyLocalToHdfs(fs, formatPath(props.igniteLocalWorkDir(), latestVersion),
+ formatPath(props.igniteReleasesDir(), latestVersion));
+ }
+ }
+
+ /**
+ * @param folder Folder.
+ * @return Ignite archives.
+ */
+ private List<String> findIgnites(File folder) {
+ String[] files = folder.list();
+
+ List<String> ignites = new ArrayList<>();
+
+ if (files != null) {
+ for (String fileName : files) {
+ if (fileName.contains("gridgain-community-fabric-") && fileName.endsWith(".zip"))
+ ignites.add(fileName);
+ }
+ }
+
+ return ignites;
+ }
+
+ /**
+ * @param files Files.
+ * @return latest ignite version.
+ */
+ private String findLatestVersion(List<String> files) {
+ String latestVersion = null;
+
+ if (!files.isEmpty()) {
+ if (files.size() == 1)
+ latestVersion = parseVersion(files.get(0));
+ else
+ latestVersion = parseVersion(Collections.max(files, VersionComparator.INSTANCE));
+ }
+
+ return latestVersion;
+ }
+
+ /**
+ * @param fs File system,
+ * @param folder Folder.
+ * @return Ignite archives.
+ */
+ private List<String> findIgnites(FileSystem fs, String folder) {
+ FileStatus[] fileStatuses = null;
+
+ try {
+ fileStatuses = fs.listStatus(new Path(folder));
+ }
+ catch (FileNotFoundException e) {
+ // Ignore. Folder doesn't exist.
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Couldnt get list files from hdfs.", e);
+ }
+
+ List<String> ignites = new ArrayList<>();
+
+ if (fileStatuses != null) {
+ for (FileStatus file : fileStatuses) {
+ String fileName = file.getPath().getName();
+
+ if (fileName.contains("gridgain-community-fabric-") && fileName.endsWith(".zip"))
+ ignites.add(fileName);
+ }
+ }
+
+ return ignites;
+ }
+
+ /**
+ * @param version Ignite version.
+ * @return Ignite.
+ */
+ public Path getIgnite(String version) throws Exception {
+ File folder = checkDownloadFolder();
+
+ // Check to hdfs contains required ignite version.
+ List<String> hdfsFiles = findIgnites(fs, props.igniteReleasesDir());
+
+ if (hdfsFiles != null && !hdfsFiles.isEmpty()) {
+ for (String fileName : hdfsFiles) {
+ if (fileName.equals("gridgain-community-fabric-" + version + ".zip"))
+ return new Path(formatPath(props.igniteReleasesDir(), version));
+ }
+ }
+
+ // Check local fs.
+ List<String> localFiles = findIgnites(folder);
+
+ if (localFiles != null) {
+ for (String fileName : localFiles) {
+ if (fileName.equals("gridgain-community-fabric-" + version + ".zip")) {
+ Path dst = new Path(formatPath(props.igniteReleasesDir(), version));
+
+ fs.copyFromLocalFile(new Path(formatPath(props.igniteLocalWorkDir(), latestVersion)), dst);
+
+ return dst;
+ }
+ }
+ }
+
+ // Download ignite.
+ downloadIgnite(version);
+
+ Path dst = new Path(formatPath(props.igniteReleasesDir(), version));
+
+ fs.copyFromLocalFile(new Path(formatPath(props.igniteLocalWorkDir(), latestVersion)), dst);
+
+ return dst;
+ }
+
+
+ /**
+ * @param folder folder
+ * @param version version
+ * @return Path
+ */
+ private static String formatPath(String folder, String version) {
+ return folder + File.separator + "gridgain-community-fabric-" + version + ".zip";
+ }
+
+ /**
+ * @param currentVersion The current latest version.
+ * @return Current version if the current version is latest; new ignite version otherwise.
+ */
+ private String updateIgnite(String currentVersion) {
+ try {
+ URL url;
+
+ if (currentVersion == null)
+ url = new URL(DOWNLOAD_LINK);
+ else
+ url = new URL(DOWNLOAD_LINK + "?version=" + currentVersion);
+
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code == 200) {
+ String redirectUrl = conn.getURL().toString();
+
+ checkDownloadFolder();
+
+ FileOutputStream outFile = new FileOutputStream(props.igniteLocalWorkDir() + File.separator
+ + fileName(redirectUrl));
+
+ outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE);
+
+ outFile.close();
+
+ return parseVersion(redirectUrl);
+ }
+ else if (code == 304)
+ // This version is latest.
+ return currentVersion;
+ else
+ throw new RuntimeException("Got unexpected response code. Response code: " + code);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed update ignite.", e);
+ }
+ }
+
+ /**
+ * @param version The current latest version.
+ * @return Ignite archive.
+ */
+ private String downloadIgnite(String version) {
+ try {
+ URL url = new URL(DIRECT_DOWNLOAD_LINK + version + ".zip");
+
+ HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+
+ int code = conn.getResponseCode();
+
+ if (code == 200) {
+ checkDownloadFolder();
+
+ String fileName = fileName(url.toString());
+
+ FileOutputStream outFile = new FileOutputStream(props.igniteLocalWorkDir() + File.separator + fileName);
+
+ outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE);
+
+ outFile.close();
+
+ return fileName;
+ }
+ else
+ throw new RuntimeException("Got unexpected response code. Response code: " + code);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed update ignite.", e);
+ }
+ }
+
+ /**
+ * @return Download folder.
+ */
+ private File checkDownloadFolder() {
+ File file = new File(props.igniteLocalWorkDir());
+
+ if (!file.exists())
+ file.mkdirs();
+
+ return file;
+ }
+
+ /**
+ * @param url URL.
+ * @return Ignite version.
+ */
+ private static String parseVersion(String url) {
+ String[] split = url.split("-");
+
+ return split[split.length - 1].replaceAll(".zip", "");
+ }
+
+ /**
+ * @param url URL.
+ * @return File name.
+ */
+ private static String fileName(String url) {
+ String[] split = url.split("/");
+
+ return split[split.length - 1];
+ }
+
+ /**
+ * Ignite version comparator.
+ */
+ public static final class VersionComparator implements Comparator<String> {
+ /** */
+ public static final VersionComparator INSTANCE = new VersionComparator();
+
+ /** */
+ private VersionComparator() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(String f1, String f2) {
+ if (f1.equals(f2))
+ return 0;
+
+ String[] ver1 = parseVersion(f1).split("\\.");
+ String[] ver2 = parseVersion(f2).split("\\.");
+
+ if (Integer.valueOf(ver1[0]) >= Integer.valueOf(ver2[0])
+ && Integer.valueOf(ver1[1]) >= Integer.valueOf(ver2[1])
+ && Integer.valueOf(ver1[2]) >= Integer.valueOf(ver2[2]))
+
+ return 1;
+ else
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
deleted file mode 100644
index 60275fd..0000000
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteTask.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yarn;
-
-/**
- * Information about launched task.
- */
-public class IgniteTask {
- /** */
- public final String host;
-
- /** */
- public final double cpuCores;
-
- /** */
- public final double mem;
-
- /** */
- public final double disk;
-
- /**
- * Ignite launched task.
- *
- * @param host Host.
- * @param cpuCores Cpu cores count.
- * @param mem Memory.
- * @param disk Disk.
- */
- public IgniteTask(String host, double cpuCores, double mem, double disk) {
- this.host = host;
- this.cpuCores = cpuCores;
- this.mem = mem;
- this.disk = disk;
- }
-
- /**
- * @return Host.
- */
- public String host() {
- return host;
- }
-
- /**
- * @return Cores count.
- */
- public double cpuCores() {
- return cpuCores;
- }
-
- /**
- * @return Memory.
- */
- public double mem() {
- return mem;
- }
-
- /**
- * @return Disk.
- */
- public double disk() {
- return disk;
- }
-
- @Override
- public String toString() {
- return "IgniteTask " +
- "host: [" + host + ']' +
- ", cpuCores: [" + cpuCores + "]" +
- ", mem: [" + mem + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 092aaa9..0ab9e91 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -17,20 +17,15 @@
package org.apache.ignite.yarn;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.conf.*;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.hadoop.yarn.util.*;
+import org.apache.ignite.yarn.utils.*;
+
+import java.io.*;
+import java.util.*;
import java.util.logging.*;
import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
@@ -43,11 +38,18 @@ public class IgniteYarnClient {
public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
/**
- * Main methods has only one optional parameter - path to properties files.
+ * Main methods has only one optional parameter - path to properties file.
*
* @param args Args.
*/
public static void main(String[] args) throws Exception {
+ checkArguments(args);
+
+ // Set path to app master jar.
+ String pathAppMasterJar = args[0];
+
+ ClusterProperties props = ClusterProperties.from(args.length == 2 ? args[1] : null);
+
YarnConfiguration conf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
@@ -56,45 +58,48 @@ public class IgniteYarnClient {
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
+ FileSystem fs = FileSystem.get(conf);
+
+ // Load ignite and jar
+ Path ignite = getIgnite(props, fs);
+
+ Path appJar = IgniteYarnUtils.copyLocalToHdfs(fs, pathAppMasterJar,
+ props.igniteWorkDir() + File.separator + IgniteYarnUtils.JAR_NAME);
+
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ System.out.println(Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
+ + IgniteYarnUtils.SPACE + ignite.toUri());
+
amContainer.setCommands(
- Collections.singletonList(
- " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" +
- " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr"
- )
+ Collections.singletonList(
+ Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName()
+ + IgniteYarnUtils.SPACE + ignite.toUri()
+ + IgniteYarnUtils.YARN_LOG_OUT
+ )
);
// Setup jar for ApplicationMaster
- final LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf);
-
- final LocalResource igniteZip = Records.newRecord(LocalResource.class);
- setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
-
- amContainer.setLocalResources(new HashMap<String, LocalResource>() {{
- put("ignite-yarn.jar", appMasterJar);
- put("gridgain-community-fabric-1.0.6.zip", igniteZip);
- }});
-
+ LocalResource appMasterJar = IgniteYarnUtils.setupFile(appJar, fs, LocalResourceType.FILE);
+ amContainer.setLocalResources(Collections.singletonMap(IgniteYarnUtils.JAR_NAME, appMasterJar));
// Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<>();
+ Map<String, String> appMasterEnv = props.toEnvs();
+
setupAppMasterEnv(appMasterEnv, conf);
+
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(256);
+ capability.setMemory(512);
capability.setVirtualCores(1);
// Finally, set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext =
- app.getApplicationSubmissionContext();
- appContext.setApplicationName("simple-yarn-app"); // application name
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ appContext.setApplicationName("ignition"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue("default"); // queue
@@ -106,44 +111,57 @@ public class IgniteYarnClient {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
+
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.FAILED) {
Thread.sleep(100);
+
appReport = yarnClient.getApplicationReport(appId);
+
appState = appReport.getYarnApplicationState();
}
- System.out.println(
- "Application " + appId + " finished with" +
- " state " + appState +
- " at " + appReport.getFinishTime());
- }
+ yarnClient.killApplication(appId);
- private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
- throws Exception {
- FileSystem fileSystem = FileSystem.get(conf);
- jarPath = fileSystem.makeQualified(jarPath);
+ System.out.println("Application " + appId + " finished with state " + appState + " at "
+ + appReport.getFinishTime());
+ }
- FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+ /**
+ * Check input arguments.
+ *
+ * @param args Arguments.
+ */
+ private static void checkArguments(String[] args) {
+ if (args.length < 1)
+ throw new IllegalArgumentException();
+ }
- appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
- appMasterJar.setSize(jarStat.getLen());
- appMasterJar.setTimestamp(jarStat.getModificationTime());
- appMasterJar.setType(LocalResourceType.ARCHIVE);
- appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+ /**
+ * @param props Properties.
+ * @param fileSystem Hdfs file system.
+ * @return Hdfs path to ignite node.
+ * @throws Exception
+ */
+ private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws Exception {
+ IgniteProvider provider = new IgniteProvider(props, fileSystem);
- System.out.println("Path :" + jarPath);
+ return provider.getIgnite();
}
- private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) {
- for (String c : conf.getStrings(
- YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
- Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+ /**
+ *
+ * @param envs Environment variables.
+ * @param conf Yarn configuration.
+ */
+ private static void setupAppMasterEnv(Map<String, String> envs, YarnConfiguration conf) {
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
+ Apps.addToEnvironment(envs, Environment.CLASSPATH.name(),
c.trim(), File.pathSeparator);
- Apps.addToEnvironment(appMasterEnv,
+ Apps.addToEnvironment(envs,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*",
File.pathSeparator);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
new file mode 100644
index 0000000..1e6c414
--- /dev/null
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn.utils;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.util.*;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
+
+/**
+ *
+ */
+public class IgniteYarnUtils {
+ /** */
+ public static final String DEFAULT_IGNITE_CONFIG = "ignite-default-config.xml";
+
+ /** */
+ public static final String SPACE = " ";
+
+ /** */
+ public static final String JAR_NAME = "ignite-yarn.jar";
+
+ /** */
+ public static final String YARN_LOG_OUT =
+ " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr";
+
+ /**
+ * @param file Path.
+ * @param fs File system.
+ * @param type Local resource type.
+ * @throws Exception If failed.
+ */
+ public static LocalResource setupFile(Path file, FileSystem fs, LocalResourceType type)
+ throws Exception {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+
+ file = fs.makeQualified(file);
+
+ FileStatus stat = fs.getFileStatus(file);
+
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(file));
+ resource.setSize(stat.getLen());
+ resource.setTimestamp(stat.getModificationTime());
+ resource.setType(type);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ return resource;
+ }
+
+ /**
+ * @param fs File system.
+ * @param src Source path.
+ * @param dst Destination path.
+ * @return Path to file to hdfs file system.
+ */
+ public static Path copyLocalToHdfs(FileSystem fs, String src, String dst) throws Exception {
+ Path dstPath = new Path(dst);
+
+ // Local file isn't removed, dst file override.
+ fs.copyFromLocalFile(false, true, new Path(src), dstPath);
+
+ return dstPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/85f4a891/modules/yarn/src/main/resources/ignite-default-config.xml
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/resources/ignite-default-config.xml b/modules/yarn/src/main/resources/ignite-default-config.xml
new file mode 100644
index 0000000..96bb669
--- /dev/null
+++ b/modules/yarn/src/main/resources/ignite-default-config.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"/>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
[2/8] incubator-ignite git commit: #IGNITE-YARN
Posted by sb...@apache.org.
#IGNITE-YARN
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5691911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5691911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5691911
Branch: refs/heads/yarn
Commit: b5691911dc545db3264afcf23153e2f9ec914724
Parents: 50cfa27
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Tue Jun 2 21:17:35 2015 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Tue Jun 2 21:17:35 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 120 ++++++++++---------
.../apache/ignite/yarn/IgniteYarnClient.java | 116 +++++++++++++++++-
.../ignite/yarn/IgniteSchedulerSelfTest.java | 2 +
3 files changed, 179 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index f52a1de..9ab70d4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -17,6 +17,7 @@
package org.apache.ignite.yarn;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.yarn.api.*;
import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -32,56 +33,87 @@ import java.util.*;
* TODO
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
- /** {@inheritDoc} */
- @Override public void onContainersCompleted(List<ContainerStatus> statuses) {
-
+ Configuration configuration;
+ NMClient nmClient;
+ int numContainersToWaitFor = 5;
+
+ public ApplicationMaster() {
+ configuration = new YarnConfiguration();
+ nmClient = NMClient.createNMClient();
+ nmClient.init(configuration);
+ nmClient.start();
}
- /** {@inheritDoc} */
- @Override public void onContainersAllocated(List<Container> containers) {
+ public void onContainersAllocated(List<Container> containers) {
+ for (Container container : containers) {
+ try {
+ // Launch container by create ContainerLaunchContext
+ // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov
+ ContainerLaunchContext ctx =
+ Records.newRecord(ContainerLaunchContext.class);
+ ctx.setCommands(
+ Lists.newArrayList(
+ "ls " +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+ ));
+ System.out.println("[AM] Launching container " + container.getId());
+ nmClient.startContainer(container, ctx);
+ } catch (Exception ex) {
+ System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
+ }
+ }
+ }
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ for (ContainerStatus status : statuses) {
+ System.out.println("[AM] Completed container " + status.getContainerId());
+ synchronized (this) {
+ numContainersToWaitFor--;
+ }
+ }
}
- /** {@inheritDoc} */
- @Override public void onShutdownRequest() {
+ public void onNodesUpdated(List<NodeReport> updated) {
+ }
+ public void onReboot() {
}
- /** {@inheritDoc} */
- @Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ public void onShutdownRequest() {
+ }
+ public void onError(Throwable t) {
}
- /** {@inheritDoc} */
- @Override public float getProgress() {
+ public float getProgress() {
return 0;
}
- /** {@inheritDoc} */
- @Override public void onError(Throwable e) {
+ public boolean doneWithContainers() {
+ return numContainersToWaitFor == 0;
+ }
+ public Configuration getConfiguration() {
+ return configuration;
}
- /**
- * @param args Arguments.
- */
public static void main(String[] args) throws Exception {
- final String command = args[0];
- final int n = Integer.valueOf(args[1]);
+ ApplicationMaster master = new ApplicationMaster();
+ master.runMainLoop();
- // Initialize clients to ResourceManager and NodeManagers
- Configuration conf = new YarnConfiguration();
+ }
- AMRMClient<AMRMClient.ContainerRequest> rmClient = AMRMClient.createAMRMClient();
- rmClient.init(conf);
- rmClient.start();
+ public void runMainLoop() throws Exception {
- NMClient nmClient = NMClient.createNMClient();
- nmClient.init(conf);
- nmClient.start();
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this);
+ rmClient.init(getConfiguration());
+ rmClient.start();
// Register with ResourceManager
+ System.out.println("[AM] registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
+ System.out.println("[AM] registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
@@ -93,41 +125,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
capability.setVirtualCores(1);
// Make container requests to ResourceManager
- for (int i = 0; i < n; ++i) {
- AMRMClient.ContainerRequest containerAsk =
- new AMRMClient.ContainerRequest(capability, null, null, priority);
-
+ for (int i = 0; i < numContainersToWaitFor; ++i) {
+ AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
+ System.out.println("[AM] Making res-req " + i);
rmClient.addContainerRequest(containerAsk);
}
- // Obtain allocated containers, launch and check for responses
- int responseId = 0;
- int completedContainers = 0;
- while (completedContainers < n) {
- AllocateResponse response = rmClient.allocate(responseId++);
- for (Container container : response.getAllocatedContainers()) {
- // Launch container by create ContainerLaunchContext
- ContainerLaunchContext ctx =
- Records.newRecord(ContainerLaunchContext.class);
-
- ctx.setCommands(
- Collections.singletonList(
- command +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
- ));
-
- nmClient.startContainer(container, ctx);
- }
- for (ContainerStatus status : response.getCompletedContainersStatuses()) {
- ++completedContainers;
- System.out.println("Completed container " + status.getContainerId());
- }
+ System.out.println("[AM] waiting for containers to finish");
+ while (!doneWithContainers()) {
Thread.sleep(100);
}
+ System.out.println("[AM] unregisterApplicationMaster 0");
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, "", "");
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ System.out.println("[AM] unregisterApplicationMaster 1");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 7cef50d..e020ef4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -17,11 +17,24 @@
package org.apache.ignite.yarn;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.conf.*;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.*;
+import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
+
/**
* Ignite yarn client.
*/
@@ -35,16 +48,109 @@ public class IgniteYarnClient {
* @param args Args.
*/
public static void main(String[] args) throws Exception {
- ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null);
-
- // Create yarnClient
YarnConfiguration conf = new YarnConfiguration();
-
YarnClient yarnClient = YarnClient.createYarnClient();
-
yarnClient.init(conf);
yarnClient.start();
+ // Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ amContainer.setCommands(
+ Collections.singletonList(
+ " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" +
+ " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr"
+ )
+ );
+
+ // Setup jar for ApplicationMaster
+ final LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf);
+
+ final LocalResource igniteZip = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
+
+ FileSystem fileSystem = FileSystem.get(conf);
+
+ Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh"));
+
+ System.out.println("Path: " + path);
+ System.out.println("Path URI: " + path.toUri().toString());
+
+ amContainer.setLocalResources(new HashMap<String, LocalResource>(){{
+ put("ignite-yarn.jar", appMasterJar);
+ put("ignite", igniteZip);
+ }});
+
+ // Setup CLASSPATH for ApplicationMaster
+ Map<String, String> appMasterEnv = new HashMap<String, String>();
+ setupAppMasterEnv(appMasterEnv, conf);
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(256);
+ capability.setVirtualCores(1);
+
+ // Finally, set-up ApplicationSubmissionContext for the application
+ ApplicationSubmissionContext appContext =
+ app.getApplicationSubmissionContext();
+ appContext.setApplicationName("simple-yarn-app"); // application name
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ appContext.setQueue("default"); // queue
+
+ // Submit application
+ ApplicationId appId = appContext.getApplicationId();
+ System.out.println("Submitting application " + appId);
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ while (appState != YarnApplicationState.FINISHED &&
+ appState != YarnApplicationState.KILLED &&
+ appState != YarnApplicationState.FAILED) {
+ Thread.sleep(100);
+ appReport = yarnClient.getApplicationReport(appId);
+ appState = appReport.getYarnApplicationState();
+ }
+
+ System.out.println(
+ "Application " + appId + " finished with" +
+ " state " + appState +
+ " at " + appReport.getFinishTime());
+ }
+
+ private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
+ throws Exception {
+ FileSystem fileSystem = FileSystem.get(conf);
+ jarPath = fileSystem.makeQualified(jarPath);
+
+ FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+
+ appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ appMasterJar.setSize(jarStat.getLen());
+ appMasterJar.setTimestamp(jarStat.getModificationTime());
+ appMasterJar.setType(LocalResourceType.ARCHIVE);
+ appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ System.out.println("Path :" + jarPath);
+ }
+
+ private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
+ Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+ c.trim(), File.pathSeparator);
+
+ Apps.addToEnvironment(appMasterEnv,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + File.separator + "*",
+ File.pathSeparator);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
index 1a03743..04d3492 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
@@ -23,5 +23,7 @@ import junit.framework.*;
* Scheduler tests.
*/
public class IgniteSchedulerSelfTest extends TestCase {
+ public void testName() throws Exception {
+ }
}