You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/09 17:25:00 UTC

[02/19] incubator-ignite git commit: #IGNITE-YARN

#IGNITE-YARN


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5691911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5691911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5691911

Branch: refs/heads/ignite-1.3
Commit: b5691911dc545db3264afcf23153e2f9ec914724
Parents: 50cfa27
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Tue Jun 2 21:17:35 2015 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Tue Jun 2 21:17:35 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/yarn/ApplicationMaster.java   | 120 ++++++++++---------
 .../apache/ignite/yarn/IgniteYarnClient.java    | 116 +++++++++++++++++-
 .../ignite/yarn/IgniteSchedulerSelfTest.java    |   2 +
 3 files changed, 179 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index f52a1de..9ab70d4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.yarn;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.yarn.api.*;
 import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -32,56 +33,87 @@ import java.util.*;
  * TODO
  */
 public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
-    /** {@inheritDoc} */
-    @Override public void onContainersCompleted(List<ContainerStatus> statuses) {
-
+    Configuration configuration;
+    NMClient nmClient;
+    int numContainersToWaitFor = 5;
+
+    public ApplicationMaster() {
+        configuration = new YarnConfiguration();
+        nmClient = NMClient.createNMClient();
+        nmClient.init(configuration);
+        nmClient.start();
     }
 
-    /** {@inheritDoc} */
-    @Override public void onContainersAllocated(List<Container> containers) {
+    public void onContainersAllocated(List<Container> containers) {
+        for (Container container : containers) {
+            try {
+                // Launch container by create ContainerLaunchContext
+                // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov
+                ContainerLaunchContext ctx =
+                        Records.newRecord(ContainerLaunchContext.class);
+                ctx.setCommands(
+                        Lists.newArrayList(
+                                "ls " +
+                                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+                                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+                        ));
+                System.out.println("[AM] Launching container " + container.getId());
+                nmClient.startContainer(container, ctx);
+            } catch (Exception ex) {
+                System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
+            }
+        }
+    }
 
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+        for (ContainerStatus status : statuses) {
+            System.out.println("[AM] Completed container " + status.getContainerId());
+            synchronized (this) {
+                numContainersToWaitFor--;
+            }
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public void onShutdownRequest() {
+    public void onNodesUpdated(List<NodeReport> updated) {
+    }
 
+    public void onReboot() {
     }
 
-    /** {@inheritDoc} */
-    @Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    public void onShutdownRequest() {
+    }
 
+    public void onError(Throwable t) {
     }
 
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
+    public float getProgress() {
         return 0;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onError(Throwable e) {
+    public boolean doneWithContainers() {
+        return numContainersToWaitFor == 0;
+    }
 
+    public Configuration getConfiguration() {
+        return configuration;
     }
 
-    /**
-     * @param args Arguments.
-     */
     public static void main(String[] args) throws Exception {
-        final String command = args[0];
-        final int n = Integer.valueOf(args[1]);
+        ApplicationMaster master = new ApplicationMaster();
+        master.runMainLoop();
 
-        // Initialize clients to ResourceManager and NodeManagers
-        Configuration conf = new YarnConfiguration();
+    }
 
-        AMRMClient<AMRMClient.ContainerRequest> rmClient = AMRMClient.createAMRMClient();
-        rmClient.init(conf);
-        rmClient.start();
+    public void runMainLoop() throws Exception {
 
-        NMClient nmClient = NMClient.createNMClient();
-        nmClient.init(conf);
-        nmClient.start();
+        AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this);
+        rmClient.init(getConfiguration());
+        rmClient.start();
 
         // Register with ResourceManager
+        System.out.println("[AM] registerApplicationMaster 0");
         rmClient.registerApplicationMaster("", 0, "");
+        System.out.println("[AM] registerApplicationMaster 1");
 
         // Priority for worker containers - priorities are intra-application
         Priority priority = Records.newRecord(Priority.class);
@@ -93,41 +125,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
         capability.setVirtualCores(1);
 
         // Make container requests to ResourceManager
-        for (int i = 0; i < n; ++i) {
-            AMRMClient.ContainerRequest containerAsk =
-                new AMRMClient.ContainerRequest(capability, null, null, priority);
-
+        for (int i = 0; i < numContainersToWaitFor; ++i) {
+            AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
+            System.out.println("[AM] Making res-req " + i);
             rmClient.addContainerRequest(containerAsk);
         }
 
-        // Obtain allocated containers, launch and check for responses
-        int responseId = 0;
-        int completedContainers = 0;
-        while (completedContainers < n) {
-            AllocateResponse response = rmClient.allocate(responseId++);
-            for (Container container : response.getAllocatedContainers()) {
-                // Launch container by create ContainerLaunchContext
-                ContainerLaunchContext ctx =
-                    Records.newRecord(ContainerLaunchContext.class);
-
-                ctx.setCommands(
-                    Collections.singletonList(
-                        command +
-                            " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-                            " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
-                    ));
-
-                nmClient.startContainer(container, ctx);
-            }
-            for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-                ++completedContainers;
-                System.out.println("Completed container " + status.getContainerId());
-            }
+        System.out.println("[AM] waiting for containers to finish");
+        while (!doneWithContainers()) {
             Thread.sleep(100);
         }
 
+        System.out.println("[AM] unregisterApplicationMaster 0");
         // Un-register with ResourceManager
         rmClient.unregisterApplicationMaster(
-            FinalApplicationStatus.SUCCEEDED, "", "");
+                FinalApplicationStatus.SUCCEEDED, "", "");
+        System.out.println("[AM] unregisterApplicationMaster 1");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 7cef50d..e020ef4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -17,11 +17,24 @@
 
 package org.apache.ignite.yarn;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.*;
 import org.apache.hadoop.yarn.conf.*;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.logging.*;
 
+import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
+
 /**
  * Ignite yarn client.
  */
@@ -35,16 +48,109 @@ public class IgniteYarnClient {
      * @param args Args.
      */
     public static void main(String[] args) throws Exception {
-        ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null);
-
-        // Create yarnClient
         YarnConfiguration conf = new YarnConfiguration();
-
         YarnClient yarnClient = YarnClient.createYarnClient();
-
         yarnClient.init(conf);
         yarnClient.start();
 
+        // Create application via yarnClient
         YarnClientApplication app = yarnClient.createApplication();
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        amContainer.setCommands(
+                Collections.singletonList(
+                        " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" +
+                        " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
+                        " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr"
+                )
+        );
+
+        // Setup jar for ApplicationMaster
+        final LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+        setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf);
+
+        final LocalResource igniteZip = Records.newRecord(LocalResource.class);
+        setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
+
+        FileSystem fileSystem = FileSystem.get(conf);
+
+        Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh"));
+
+        System.out.println("Path: " + path);
+        System.out.println("Path URI: " + path.toUri().toString());
+
+        amContainer.setLocalResources(new HashMap<String, LocalResource>(){{
+            put("ignite-yarn.jar", appMasterJar);
+            put("ignite", igniteZip);
+        }});
+
+        // Setup CLASSPATH for ApplicationMaster
+        Map<String, String> appMasterEnv = new HashMap<String, String>();
+        setupAppMasterEnv(appMasterEnv, conf);
+        amContainer.setEnvironment(appMasterEnv);
+
+        // Set up resource type requirements for ApplicationMaster
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(256);
+        capability.setVirtualCores(1);
+
+        // Finally, set-up ApplicationSubmissionContext for the application
+        ApplicationSubmissionContext appContext =
+                app.getApplicationSubmissionContext();
+        appContext.setApplicationName("simple-yarn-app"); // application name
+        appContext.setAMContainerSpec(amContainer);
+        appContext.setResource(capability);
+        appContext.setQueue("default"); // queue
+
+        // Submit application
+        ApplicationId appId = appContext.getApplicationId();
+        System.out.println("Submitting application " + appId);
+        yarnClient.submitApplication(appContext);
+
+        ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+        YarnApplicationState appState = appReport.getYarnApplicationState();
+        while (appState != YarnApplicationState.FINISHED &&
+                appState != YarnApplicationState.KILLED &&
+                appState != YarnApplicationState.FAILED) {
+            Thread.sleep(100);
+            appReport = yarnClient.getApplicationReport(appId);
+            appState = appReport.getYarnApplicationState();
+        }
+
+        System.out.println(
+                "Application " + appId + " finished with" +
+                        " state " + appState +
+                        " at " + appReport.getFinishTime());
+    }
+
+    private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
+        throws Exception {
+        FileSystem fileSystem = FileSystem.get(conf);
+        jarPath = fileSystem.makeQualified(jarPath);
+
+        FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+
+        appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+        appMasterJar.setSize(jarStat.getLen());
+        appMasterJar.setTimestamp(jarStat.getModificationTime());
+        appMasterJar.setType(LocalResourceType.ARCHIVE);
+        appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+
+        System.out.println("Path :" + jarPath);
+    }
+
+    private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) {
+        for (String c : conf.getStrings(
+                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
+            Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+                    c.trim(), File.pathSeparator);
+
+        Apps.addToEnvironment(appMasterEnv,
+                Environment.CLASSPATH.name(),
+                Environment.PWD.$() + File.separator + "*",
+                File.pathSeparator);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
index 1a03743..04d3492 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
@@ -23,5 +23,7 @@ import junit.framework.*;
  * Scheduler tests.
  */
 public class IgniteSchedulerSelfTest extends TestCase {
+    public void testName() throws Exception {
 
+    }
 }