You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 10:49:05 UTC
[07/50] [abbrv] incubator-ignite git commit: #YARN Code cleanup.
Added tests.
#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e072dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e072dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e072dc4
Branch: refs/heads/ignite-901
Commit: 7e072dc44eacd4ad088901ea7888aa8ccaf7d44a
Parents: 960e19d
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue Jun 9 19:02:56 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue Jun 9 19:02:56 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 31 ++-
.../yarn/IgniteApplicationMasterSelfTest.java | 226 ++++++++++++++++++-
2 files changed, 243 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index c552ea0..0ef1362 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -91,7 +91,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
Map<String, String> env = new HashMap<>(System.getenv());
- //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
+ env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
ctx.setEnvironment(env);
@@ -284,15 +284,18 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
}
}
+ catch (InterruptedException e) {
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
+
+ log.log(Level.WARNING, "Application master killed.");
+ }
catch (Exception e) {
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
- System.exit(1);
+ log.log(Level.SEVERE, "Application master failed.", e);
}
-
- // Un-register with ResourceManager
- rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
}
/**
@@ -364,4 +367,22 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
public void setSchedulerTimeout(long schedulerTimeout) {
this.schedulerTimeout = schedulerTimeout;
}
+
+ /**
+ * Sets file system.
+ * @param fs File system.
+ */
+ public void setFs(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ /**
+ * JUST FOR TESTING!!!
+ *
+ * @return Running containers.
+ */
+ @Deprecated
+ public Map<ContainerId, IgniteContainer> getContainers() {
+ return containers;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
index d865659..abac58e 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -18,8 +18,9 @@
package org.apache.ignite.yarn;
import junit.framework.*;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.yarn.api.protocolrecords.*;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
@@ -27,8 +28,11 @@ import org.apache.hadoop.yarn.client.api.async.*;
import org.apache.hadoop.yarn.exceptions.*;
import java.io.*;
+import java.net.*;
import java.nio.*;
import java.util.*;
+import java.util.concurrent.*;
+import java.util.regex.*;
/**
* Application master tests.
@@ -52,7 +56,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
props = new ClusterProperties();
appMaster = new ApplicationMaster("test", props);
- appMaster.setSchedulerTimeout(100000);
+ appMaster.setSchedulerTimeout(10000);
rmMock.clear();
}
@@ -82,7 +86,6 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
}
-
/**
* @throws Exception If failed.
*/
@@ -98,14 +101,125 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
Thread thread = runAppMaster(appMaster);
- interruptedThread(thread);
-
List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+ interruptedThread(thread);
+
assertEquals(0, contRequests.size());
}
/**
+ * @throws Exception If failed.
+ */
+ public void testClusterAllocatedResource() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 2000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 1, 7000)));
+ assertEquals(0, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(1, appMaster.getContainers().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 7000)));
+ assertEquals(2, appMaster.getContainers().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartReleaseContainer() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ NMMock nmClient = new NMMock();
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(nmClient);
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+ assertEquals(1, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 7000)));
+ assertEquals(2, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 9, 2000)));
+ assertEquals(3, rmMock.releasedResources().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(3, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testHostnameConstraint() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+
+ NMMock nmClient = new NMMock();
+
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(nmClient);
+
+ appMaster.setFs(new MockFileSystem());
+
+ props.cpusPerNode(8);
+ props.memoryPerNode(5000);
+ props.instances(3);
+ props.hostnameConstraint(Pattern.compile("ignoreHost"));
+
+ // Check that container resources
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+ assertEquals(0, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+
+ appMaster.onContainersAllocated(Collections.singletonList(createContainer("ignoreHost", 8, 5000)));
+ assertEquals(1, rmMock.releasedResources().size());
+ assertEquals(1, nmClient.startedContainer().size());
+ }
+
+ /**
+ * @param host Host.
+ * @param cpu Cpu count.
+ * @param mem Memory.
+ * @return Container.
+ */
+ private Container createContainer(String host, int cpu, int mem) {
+ return Container.newInstance(
+ ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0),
+ ThreadLocalRandom.current().nextLong()),
+ NodeId.newInstance(host, 0),
+ "example.com",
+ new MockResource(mem, cpu),
+ Priority.newInstance(0),
+ null
+ );
+ }
+
+ /**
* @param rmMock RM mock.
* @param expectedCnt Expected cnt.
* @param timeOut Timeout.
@@ -147,7 +261,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
/**
- * Interrupt thread and wait.
+ * Interrupt thread and join.
*
* @param thread Thread.
*/
@@ -165,6 +279,9 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
/** */
+ private List<ContainerId> releasedConts = new ArrayList<>();
+
+ /** */
private Resource availableRes;
/** */
@@ -180,6 +297,13 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
}
/**
+ * @return Released resources.
+ */
+ public List<ContainerId> releasedResources() {
+ return releasedConts;
+ }
+
+ /**
* Sets resource.
*
* @param availableRes Available resource.
@@ -193,6 +317,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
*/
public void clear() {
contRequests.clear();
+ releasedConts.clear();
availableRes = null;
}
@@ -226,7 +351,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
/** {@inheritDoc} */
@Override public void releaseAssignedContainer(ContainerId containerId) {
- // No-op.
+ releasedConts.add(containerId);
}
/** {@inheritDoc} */
@@ -250,13 +375,26 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
*/
public static class NMMock extends NMClient {
/** */
- protected NMMock() {
+ private List<ContainerLaunchContext> startedContainer = new ArrayList<>();
+
+ /** */
+ public NMMock() {
super("name");
}
+ /**
+ * @return Started containers.
+ */
+ public List<ContainerLaunchContext> startedContainer() {
+ return startedContainer;
+ }
+
/** {@inheritDoc} */
@Override public Map<String, ByteBuffer> startContainer(Container container,
ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
+
+ startedContainer.add(containerLaunchContext);
+
return null;
}
@@ -321,4 +459,74 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
return 0;
}
}
+
+ /**
+ * Mock file system.
+ */
+ public static class MockFileSystem extends FileSystem {
+ /** */
+ public MockFileSystem() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path makeQualified(Path path) {
+ return new Path("/test/path");
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ return new FileStatus();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path new_dir) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return new FileStatus[0];
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return null;
+ }
+ }
}