You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 10:49:04 UTC
[06/50] [abbrv] incubator-ignite git commit: #YARN Code cleanup.
Added tests.
#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960e19dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960e19dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960e19dd
Branch: refs/heads/ignite-901
Commit: 960e19dda15d58ddc403a8e6856d0eb19d7794c1
Parents: 858d2a3
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue Jun 9 16:38:07 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue Jun 9 16:38:07 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 89 +++--
.../apache/ignite/yarn/ClusterProperties.java | 53 +--
.../apache/ignite/yarn/IgniteYarnClient.java | 30 +-
.../org/apache/ignite/IgniteMesosTestSuite.java | 38 ---
.../org/apache/ignite/IgniteYarnTestSuite.java | 38 +++
.../yarn/IgniteApplicationMasterSelfTest.java | 324 +++++++++++++++++++
.../ignite/yarn/IgniteSchedulerSelfTest.java | 29 --
7 files changed, 460 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 3bf0521..c552ea0 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -47,27 +47,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public static final String DELIM = ",";
/** */
+ private long schedulerTimeout = TimeUnit.SECONDS.toMillis(1);
+
+ /** Yarn configuration. */
private YarnConfiguration conf;
- /** */
+ /** Cluster properties. */
private ClusterProperties props;
- /** */
+ /** Network manager. */
private NMClient nmClient;
- /** */
- AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
+ /** Resource manager. */
+ private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient;
- /** */
+ /** Ignite path. */
private Path ignitePath;
- /** */
+ /** Config path. */
private Path cfgPath;
- /** */
+ /** Hadoop file system. */
private FileSystem fs;
- /** */
+ /** Running containers. */
private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>();
/**
@@ -76,13 +79,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception {
this.conf = new YarnConfiguration();
this.props = props;
- this.fs = FileSystem.get(conf);
this.ignitePath = new Path(ignitePath);
-
- nmClient = NMClient.createNMClient();
-
- nmClient.init(conf);
- nmClient.start();
}
/** {@inheritDoc} */
@@ -103,11 +100,16 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE));
resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE));
+ if (props.userLibs() != null)
+ resources.put("libs", IgniteYarnUtils.setupFile(new Path(props.userLibs()), fs,
+ LocalResourceType.FILE));
+
ctx.setLocalResources(resources);
ctx.setCommands(
Collections.singletonList(
- "./ignite/*/bin/ignite.sh "
+ "cp -r ./libs/* ./ignite/*/libs/ || true && "
+ + "./ignite/*/bin/ignite.sh "
+ "./ignite-config.xml"
+ " -J-Xmx" + c.getResource().getMemory() + "m"
+ " -J-Xms" + c.getResource().getMemory() + "m"
@@ -153,7 +155,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Check that slave satisfies min requirements.
if (cont.getResource().getVirtualCores() < props.cpusPerNode()
|| cont.getResource().getMemory() < props.memoryPerNode()) {
- //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+ log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
+ new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
+ cont.getResource().getMemory()});
return false;
}
@@ -185,7 +189,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
for (ContainerStatus status : statuses) {
containers.remove(status.getContainerId());
- //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList());
+ log.log(Level.INFO, "Container stopped. Container id: {0}. State: {1}.",
+ new Object[]{status.getContainerId(), status.getState()});
}
}
@@ -243,9 +248,6 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws Exception If failed.
*/
public void run() throws Exception {
- rmClient.init(conf);
- rmClient.start();
-
// Register with ResourceManager
rmClient.registerApplicationMaster("", 0, "");
@@ -260,7 +262,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
while (!nmClient.isInState(Service.STATE.STOPPED)) {
int runningCnt = containers.size();
- if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) {
+ if (runningCnt < props.instances() && checkAvailableResource()) {
// Resource requirements for worker containers.
Resource capability = Records.newRecord(Resource.class);
@@ -279,7 +281,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
}
- TimeUnit.SECONDS.sleep(5);
+ TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
}
}
catch (Exception e) {
@@ -294,10 +296,11 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
}
/**
- * @param availableRes Available resources.
* @return {@code True} if cluster contains available resources.
*/
- private boolean checkAvailableResource(Resource availableRes) {
+ private boolean checkAvailableResource() {
+ Resource availableRes = rmClient.getAvailableResources();
+
return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
&& availableRes.getVirtualCores() >= props.cpusPerNode();
}
@@ -306,10 +309,17 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
* @throws IOException
*/
public void init() throws IOException {
+ fs = FileSystem.get(conf);
+
+ nmClient = NMClient.createNMClient();
+
+ nmClient.init(conf);
+ nmClient.start();
+
// Create async application master.
rmClient = AMRMClientAsync.createAMRMClientAsync(300, this);
- if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) {
+ if (props.igniteCfg() == null || props.igniteCfg().isEmpty()) {
InputStream input = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG);
@@ -325,6 +335,33 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
IOUtils.closeQuietly(outputStream);
}
else
- cfgPath = new Path(props.igniteConfigUrl());
+ cfgPath = new Path(props.igniteCfg());
+ }
+
+ /**
+ * Sets NMClient.
+ *
+ * @param nmClient NMClient.
+ */
+ public void setNmClient(NMClient nmClient) {
+ this.nmClient = nmClient;
+ }
+
+ /**
+ * Sets RMClient
+ *
+ * @param rmClient AMRMClientAsync.
+ */
+ public void setRmClient(AMRMClientAsync<AMRMClient.ContainerRequest> rmClient) {
+ this.rmClient = rmClient;
+ }
+
+ /**
+ * Sets scheduler timeout.
+ *
+ * @param schedulerTimeout Scheduler timeout.
+ */
+ public void setSchedulerTimeout(long schedulerTimeout) {
+ this.schedulerTimeout = schedulerTimeout;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index f9fdb59..d021d45 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -59,8 +59,11 @@ public class ClusterProperties {
/** */
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
+ /** */
+ public static final int DEFAULT_IGNITE_NODE_COUNT = 3;
+
/** Node count limit. */
- private double nodeCnt = 3;
+ private double nodeCnt = DEFAULT_IGNITE_NODE_COUNT;
/** */
public static final String IGNITE_VERSION = "IGNITE_VERSION";
@@ -105,24 +108,12 @@ public class ClusterProperties {
private String userLibs = null;
/** */
- public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL";
-
- /** URL to users libs. */
- private String userLibsUrl = null;
-
- /** */
public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG";
/** Ignite config. */
private String igniteCfg = null;
/** */
- public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL";
-
- /** Url to ignite config. */
- private String igniteCfgUrl = null;
-
- /** */
public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT";
/** Url to ignite config. */
@@ -179,6 +170,13 @@ public class ClusterProperties {
}
/**
+ * Sets instance count limit.
+ */
+ public void instances(int nodeCnt) {
+ this.nodeCnt = nodeCnt;
+ }
+
+ /**
* Sets hostname constraint.
*
* @param pattern Hostname pattern.
@@ -230,20 +228,6 @@ public class ClusterProperties {
}
/**
- * @return Url to ignite configuration.
- */
- public String igniteConfigUrl() {
- return igniteCfgUrl;
- }
-
- /**
- * @return Url to users libs configuration.
- */
- public String usersLibsUrl() {
- return userLibsUrl;
- }
-
- /**
* @return Host name constraint.
*/
public Pattern hostnameConstraint() {
@@ -268,15 +252,14 @@ public class ClusterProperties {
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
- prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null);
- prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null);
-
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
@@ -306,15 +289,14 @@ public class ClusterProperties {
prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
- prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null);
- prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null);
-
prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0);
prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0);
prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0);
prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION);
prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR);
prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
@@ -342,15 +324,14 @@ public class ClusterProperties {
envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName));
- envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl));
- envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl));
-
envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode));
envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode));
envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt));
envs.put(IGNITE_VERSION, toEnvVal(igniteVer));
envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir));
+ envs.put(IGNITE_LOCAL_WORK_DIR, toEnvVal(igniteLocalWorkDir));
+ envs.put(IGNITE_RELEASES_DIR, toEnvVal(igniteReleasesDir));
envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg));
envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index f74890d..764e717 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -39,9 +39,9 @@ public class IgniteYarnClient {
public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName());
/**
- * Main methods has only one optional parameter - path to properties file.
+ * Main methods has one mandatory parameter and one optional parameter.
*
- * @param args Args.
+ * @param args Path to jar mandatory parameter and property file is optional.
*/
public static void main(String[] args) throws Exception {
checkArguments(args);
@@ -107,24 +107,27 @@ public class IgniteYarnClient {
yarnClient.submitApplication(appContext);
- log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId);
+ log.log(Level.INFO, "Submitted application. Application id: {0}", appId);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
- while (appState != YarnApplicationState.FINISHED &&
- appState != YarnApplicationState.KILLED &&
- appState != YarnApplicationState.FAILED) {
+ while (appState == YarnApplicationState.NEW ||
+ appState == YarnApplicationState.NEW_SAVING ||
+ appState == YarnApplicationState.SUBMITTED ||
+ appState == YarnApplicationState.ACCEPTED) {
TimeUnit.SECONDS.sleep(1L);
appReport = yarnClient.getApplicationReport(appId);
+ if (appState != YarnApplicationState.ACCEPTED
+ && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
+ log.log(Level.INFO, "Application {0} is ACCEPTED.", appId);
+
appState = appReport.getYarnApplicationState();
}
- yarnClient.killApplication(appId);
-
- log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState});
+ log.log(Level.INFO, "Application {0} is {1}.", new Object[]{appId, appState});
}
/**
@@ -134,7 +137,7 @@ public class IgniteYarnClient {
*/
private static void checkArguments(String[] args) {
if (args.length < 1)
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Invalid arguments.");
}
/**
@@ -146,11 +149,14 @@ public class IgniteYarnClient {
private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws Exception {
IgniteProvider provider = new IgniteProvider(props, fileSystem);
- return provider.getIgnite();
+ if (props.igniteVer() == null
+ || props.igniteVer().equalsIgnoreCase(ClusterProperties.DEFAULT_IGNITE_VERSION))
+ return provider.getIgnite();
+ else
+ return provider.getIgnite(props.igniteVer());
}
/**
- *
* @param envs Environment variables.
* @param conf Yarn configuration.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
deleted file mode 100644
index e6920b3..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import junit.framework.*;
-import org.apache.ignite.yarn.*;
-
-/**
- * Apache Mesos integration tests.
- */
-public class IgniteMesosTestSuite extends TestSuite {
- /**
- * @return Test suite.
- * @throws Exception Thrown in case of the failure.
- */
- public static TestSuite suite() throws Exception {
- TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite");
-
- suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class));
-
- return suite;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
new file mode 100644
index 0000000..aa31774
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import junit.framework.*;
+import org.apache.ignite.yarn.*;
+
+/**
+ * Apache Hadoop Yarn integration tests.
+ */
+public class IgniteYarnTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Apache Yarn Integration Test Suite");
+
+ suite.addTest(new TestSuite(IgniteApplicationMasterSelfTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
new file mode 100644
index 0000000..d865659
--- /dev/null
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yarn;
+
+import junit.framework.*;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.*;
+import org.apache.hadoop.yarn.client.api.async.*;
+import org.apache.hadoop.yarn.exceptions.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Application master tests.
+ */
+public class IgniteApplicationMasterSelfTest extends TestCase {
+ /** */
+ private ApplicationMaster appMaster;
+
+ /** */
+ private ClusterProperties props;
+
+ /** */
+ private RMMock rmMock = new RMMock();
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Override protected void setUp() throws Exception {
+ super.setUp();
+
+ props = new ClusterProperties();
+ appMaster = new ApplicationMaster("test", props);
+
+ appMaster.setSchedulerTimeout(100000);
+
+ rmMock.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testContainerAllocate() throws Exception {
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(2);
+ props.memoryPerNode(1024);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 2, 1000);
+
+ interruptedThread(thread);
+
+ assertEquals(3, contRequests.size());
+
+ for (AMRMClient.ContainerRequest req : contRequests) {
+ assertEquals(2, req.getCapability().getVirtualCores());
+ assertEquals(1024, req.getCapability().getMemory());
+ }
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClusterResource() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(10240);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ interruptedThread(thread);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+ assertEquals(0, contRequests.size());
+ }
+
+ /**
+ * @param rmMock RM mock.
+ * @param expectedCnt Expected cnt.
+ * @param timeOut Timeout.
+ * @return Requests.
+ */
+ private List<AMRMClient.ContainerRequest> collectRequests(RMMock rmMock, int expectedCnt, int timeOut) {
+ long startTime = System.currentTimeMillis();
+
+ List<AMRMClient.ContainerRequest> requests = rmMock.requests();
+
+ while (requests.size() < expectedCnt
+ && (System.currentTimeMillis() - startTime) < timeOut)
+ requests = rmMock.requests();
+
+ return requests;
+ }
+
+ /**
+ * Runs appMaster other thread.
+ *
+ * @param appMaster Application master.
+ * @return Thread.
+ */
+ private static Thread runAppMaster(final ApplicationMaster appMaster) {
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ appMaster.run();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ thread.start();
+
+ return thread;
+ }
+
+ /**
+ * Interrupt thread and wait.
+ *
+ * @param thread Thread.
+ */
+ private static void interruptedThread(Thread thread) throws InterruptedException {
+ thread.interrupt();
+
+ thread.join();
+ }
+
+ /**
+ * Resource manager mock.
+ */
+ private static class RMMock extends AMRMClientAsync {
+ /** */
+ private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
+
+ /** */
+ private Resource availableRes;
+
+ /** */
+ public RMMock() {
+ super(0, null);
+ }
+
+ /**
+ * @return Requests.
+ */
+ public List<AMRMClient.ContainerRequest> requests() {
+ return contRequests;
+ }
+
+ /**
+ * Sets resource.
+ *
+ * @param availableRes Available resource.
+ */
+ public void availableRes(Resource availableRes) {
+ this.availableRes = availableRes;
+ }
+
+ /**
+ * Clear internal state.
+ */
+ public void clear() {
+ contRequests.clear();
+ availableRes = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<? extends Collection> getMatchingRequests(Priority priority, String resourceName,
+ Resource capability) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName,
+ int appHostPort, String appTrackingUrl) throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage,
+ String appTrackingUrl) throws YarnException, IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ contRequests.add(req);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseAssignedContainer(ContainerId containerId) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Resource getAvailableResources() {
+ return availableRes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getClusterNodeCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Network manager mock.
+ */
+ public static class NMMock extends NMClient {
+ /** */
+ protected NMMock() {
+ super("name");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, ByteBuffer> startContainer(Container container,
+ ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId)
+ throws YarnException, IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupRunningContainersOnStop(boolean enabled) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Resource.
+ */
+ public static class MockResource extends Resource {
+ /** Memory. */
+ private int mem;
+
+ /** CPU. */
+ private int cpu;
+
+ /**
+ * @param mem Memory.
+ * @param cpu CPU.
+ */
+ public MockResource(int mem, int cpu) {
+ this.mem = mem;
+ this.cpu = cpu;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMemory() {
+ return mem;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMemory(int memory) {
+ this.mem = memory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getVirtualCores() {
+ return cpu;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setVirtualCores(int vCores) {
+ this.cpu = vCores;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Resource resource) {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
deleted file mode 100644
index 04d3492..0000000
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yarn;
-
-import junit.framework.*;
-
-/**
- * Scheduler tests.
- */
-public class IgniteSchedulerSelfTest extends TestCase {
- public void testName() throws Exception {
-
- }
-}