You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/16 09:15:34 UTC

[08/50] incubator-ignite git commit: #YARN Code cleanup. Added tests.

#YARN Code cleanup. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e072dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e072dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e072dc4

Branch: refs/heads/master
Commit: 7e072dc44eacd4ad088901ea7888aa8ccaf7d44a
Parents: 960e19d
Author: nikolay tikhonov <nt...@gridgain.com>
Authored: Tue Jun 9 19:02:56 2015 +0300
Committer: nikolay tikhonov <nt...@gridgain.com>
Committed: Tue Jun 9 19:02:56 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/yarn/ApplicationMaster.java   |  31 ++-
 .../yarn/IgniteApplicationMasterSelfTest.java   | 226 ++++++++++++++++++-
 2 files changed, 243 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index c552ea0..0ef1362 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -91,7 +91,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
 
                     Map<String, String> env = new HashMap<>(System.getenv());
 
-                    //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
+                    env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost()));
 
                     ctx.setEnvironment(env);
 
@@ -284,15 +284,18 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
                 TimeUnit.MILLISECONDS.sleep(schedulerTimeout);
             }
         }
+        catch (InterruptedException e) {
+            // Un-register with ResourceManager
+            rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
+
+            log.log(Level.WARNING, "Application master killed.");
+        }
         catch (Exception e) {
             // Un-register with ResourceManager
             rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", "");
 
-            System.exit(1);
+            log.log(Level.SEVERE, "Application master failed.", e);
         }
-
-        // Un-register with ResourceManager
-        rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", "");
     }
 
     /**
@@ -364,4 +367,22 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
     public void setSchedulerTimeout(long schedulerTimeout) {
         this.schedulerTimeout = schedulerTimeout;
     }
+
+    /**
+     * Sets file system.
+     * @param fs File system.
+     */
+    public void setFs(FileSystem fs) {
+        this.fs = fs;
+    }
+
+    /**
+     * JUST FOR TESTING!!!
+     *
+     * @return Running containers.
+     */
+    @Deprecated
+    public Map<ContainerId, IgniteContainer> getContainers() {
+        return containers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
index d865659..abac58e 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -18,8 +18,9 @@
 package org.apache.ignite.yarn;
 
 import junit.framework.*;
-import org.apache.curator.utils.ThreadUtils;
-import org.apache.hadoop.util.ThreadUtil;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.yarn.api.protocolrecords.*;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.*;
@@ -27,8 +28,11 @@ import org.apache.hadoop.yarn.client.api.async.*;
 import org.apache.hadoop.yarn.exceptions.*;
 
 import java.io.*;
+import java.net.*;
 import java.nio.*;
 import java.util.*;
+import java.util.concurrent.*;
+import java.util.regex.*;
 
 /**
  * Application master tests.
@@ -52,7 +56,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
         props = new ClusterProperties();
         appMaster = new ApplicationMaster("test", props);
 
-        appMaster.setSchedulerTimeout(100000);
+        appMaster.setSchedulerTimeout(10000);
 
         rmMock.clear();
     }
@@ -82,7 +86,6 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
         }
     }
 
-
     /**
      * @throws Exception If failed.
      */
@@ -98,14 +101,125 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
 
         Thread thread = runAppMaster(appMaster);
 
-        interruptedThread(thread);
-
         List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
 
+        interruptedThread(thread);
+
         assertEquals(0, contRequests.size());
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClusterAllocatedResource() throws Exception {
+        rmMock.availableRes(new MockResource(1024, 2));
+
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(new NMMock());
+
+        appMaster.setFs(new MockFileSystem());
+
+        props.cpusPerNode(8);
+        props.memoryPerNode(5000);
+        props.instances(3);
+
+        // Check that container resources
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+        assertEquals(0, appMaster.getContainers().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 2000)));
+        assertEquals(0, appMaster.getContainers().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 1, 7000)));
+        assertEquals(0, appMaster.getContainers().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+        assertEquals(1, appMaster.getContainers().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 7000)));
+        assertEquals(2, appMaster.getContainers().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartReleaseContainer() throws Exception {
+        rmMock.availableRes(new MockResource(1024, 2));
+
+        NMMock nmClient = new NMMock();
+
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(nmClient);
+
+        appMaster.setFs(new MockFileSystem());
+
+        props.cpusPerNode(8);
+        props.memoryPerNode(5000);
+        props.instances(3);
+
+        // Check that container resources
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000)));
+        assertEquals(1, rmMock.releasedResources().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 7000)));
+        assertEquals(2, rmMock.releasedResources().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 9, 2000)));
+        assertEquals(3, rmMock.releasedResources().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+        assertEquals(3, rmMock.releasedResources().size());
+        assertEquals(1, nmClient.startedContainer().size());
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHostnameConstraint() throws Exception {
+        rmMock.availableRes(new MockResource(1024, 2));
+
+        NMMock nmClient = new NMMock();
+
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(nmClient);
+
+        appMaster.setFs(new MockFileSystem());
+
+        props.cpusPerNode(8);
+        props.memoryPerNode(5000);
+        props.instances(3);
+        props.hostnameConstraint(Pattern.compile("ignoreHost"));
+
+        // Check that container resources
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000)));
+        assertEquals(0, rmMock.releasedResources().size());
+        assertEquals(1, nmClient.startedContainer().size());
+
+        appMaster.onContainersAllocated(Collections.singletonList(createContainer("ignoreHost", 8, 5000)));
+        assertEquals(1, rmMock.releasedResources().size());
+        assertEquals(1, nmClient.startedContainer().size());
+    }
+
+    /**
+     * @param host Host.
+     * @param cpu Cpu count.
+     * @param mem Memory.
+     * @return Container.
+     */
+    private Container createContainer(String host, int cpu, int mem) {
+        return Container.newInstance(
+            ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0),
+                ThreadLocalRandom.current().nextLong()),
+            NodeId.newInstance(host, 0),
+            "example.com",
+            new MockResource(mem, cpu),
+            Priority.newInstance(0),
+            null
+        );
+    }
+
+    /**
      * @param rmMock RM mock.
      * @param expectedCnt Expected cnt.
      * @param timeOut Timeout.
@@ -147,7 +261,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
     }
 
     /**
-     * Interrupt thread and wait.
+     * Interrupt thread and join.
      *
      * @param thread Thread.
      */
@@ -165,6 +279,9 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
         private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>();
 
         /** */
+        private List<ContainerId> releasedConts = new ArrayList<>();
+
+        /** */
         private Resource availableRes;
 
         /** */
@@ -180,6 +297,13 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
         }
 
         /**
+         * @return Released resources.
+         */
+        public List<ContainerId> releasedResources() {
+            return releasedConts;
+        }
+
+        /**
          * Sets resource.
          *
          * @param availableRes Available resource.
@@ -193,6 +317,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
          */
         public void clear() {
             contRequests.clear();
+            releasedConts.clear();
             availableRes = null;
         }
 
@@ -226,7 +351,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
 
         /** {@inheritDoc} */
         @Override public void releaseAssignedContainer(ContainerId containerId) {
-            // No-op.
+            releasedConts.add(containerId);
         }
 
         /** {@inheritDoc} */
@@ -250,13 +375,26 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
      */
     public static class NMMock extends NMClient {
         /** */
-        protected NMMock() {
+        private List<ContainerLaunchContext> startedContainer = new ArrayList<>();
+
+        /** */
+        public NMMock() {
             super("name");
         }
 
+        /**
+         * @return Started containers.
+         */
+        public List<ContainerLaunchContext> startedContainer() {
+            return startedContainer;
+        }
+
         /** {@inheritDoc} */
         @Override public Map<String, ByteBuffer> startContainer(Container container,
             ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
+
+            startedContainer.add(containerLaunchContext);
+
             return null;
         }
 
@@ -321,4 +459,74 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
             return 0;
         }
     }
+
+    /**
+     * Mock file system.
+     */
+    public static class MockFileSystem extends FileSystem {
+        /** */
+        public MockFileSystem() {
+        }
+
+        /** {@inheritDoc} */
+        @Override public Path makeQualified(Path path) {
+            return new Path("/test/path");
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileStatus getFileStatus(Path f) throws IOException {
+            return new FileStatus();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Path getWorkingDirectory() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setWorkingDirectory(Path new_dir) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+            return new FileStatus[0];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean delete(Path f, boolean recursive) throws IOException {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean rename(Path src, Path dst) throws IOException {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+            short replication, long blockSize, Progressable progress) throws IOException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public URI getUri() {
+            return null;
+        }
+    }
 }