You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/07/09 17:25:00 UTC
[02/19] incubator-ignite git commit: #IGNITE-YARN
#IGNITE-YARN
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5691911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5691911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5691911
Branch: refs/heads/ignite-1.3
Commit: b5691911dc545db3264afcf23153e2f9ec914724
Parents: 50cfa27
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Tue Jun 2 21:17:35 2015 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Tue Jun 2 21:17:35 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 120 ++++++++++---------
.../apache/ignite/yarn/IgniteYarnClient.java | 116 +++++++++++++++++-
.../ignite/yarn/IgniteSchedulerSelfTest.java | 2 +
3 files changed, 179 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index f52a1de..9ab70d4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -17,6 +17,7 @@
package org.apache.ignite.yarn;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.yarn.api.*;
import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -32,56 +33,87 @@ import java.util.*;
* TODO
*/
public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
- /** {@inheritDoc} */
- @Override public void onContainersCompleted(List<ContainerStatus> statuses) {
-
+ Configuration configuration;
+ NMClient nmClient;
+ int numContainersToWaitFor = 5;
+
+ public ApplicationMaster() {
+ configuration = new YarnConfiguration();
+ nmClient = NMClient.createNMClient();
+ nmClient.init(configuration);
+ nmClient.start();
}
- /** {@inheritDoc} */
- @Override public void onContainersAllocated(List<Container> containers) {
+ public void onContainersAllocated(List<Container> containers) {
+ for (Container container : containers) {
+ try {
+ // Launch container by create ContainerLaunchContext
+ // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov
+ ContainerLaunchContext ctx =
+ Records.newRecord(ContainerLaunchContext.class);
+ ctx.setCommands(
+ Lists.newArrayList(
+ "ls " +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+ ));
+ System.out.println("[AM] Launching container " + container.getId());
+ nmClient.startContainer(container, ctx);
+ } catch (Exception ex) {
+ System.err.println("[AM] Error launching container " + container.getId() + " " + ex);
+ }
+ }
+ }
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ for (ContainerStatus status : statuses) {
+ System.out.println("[AM] Completed container " + status.getContainerId());
+ synchronized (this) {
+ numContainersToWaitFor--;
+ }
+ }
}
- /** {@inheritDoc} */
- @Override public void onShutdownRequest() {
+ public void onNodesUpdated(List<NodeReport> updated) {
+ }
+ public void onReboot() {
}
- /** {@inheritDoc} */
- @Override public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ public void onShutdownRequest() {
+ }
+ public void onError(Throwable t) {
}
- /** {@inheritDoc} */
- @Override public float getProgress() {
+ public float getProgress() {
return 0;
}
- /** {@inheritDoc} */
- @Override public void onError(Throwable e) {
+ public boolean doneWithContainers() {
+ return numContainersToWaitFor == 0;
+ }
+ public Configuration getConfiguration() {
+ return configuration;
}
- /**
- * @param args Arguments.
- */
public static void main(String[] args) throws Exception {
- final String command = args[0];
- final int n = Integer.valueOf(args[1]);
+ ApplicationMaster master = new ApplicationMaster();
+ master.runMainLoop();
- // Initialize clients to ResourceManager and NodeManagers
- Configuration conf = new YarnConfiguration();
+ }
- AMRMClient<AMRMClient.ContainerRequest> rmClient = AMRMClient.createAMRMClient();
- rmClient.init(conf);
- rmClient.start();
+ public void runMainLoop() throws Exception {
- NMClient nmClient = NMClient.createNMClient();
- nmClient.init(conf);
- nmClient.start();
+ AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this);
+ rmClient.init(getConfiguration());
+ rmClient.start();
// Register with ResourceManager
+ System.out.println("[AM] registerApplicationMaster 0");
rmClient.registerApplicationMaster("", 0, "");
+ System.out.println("[AM] registerApplicationMaster 1");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
@@ -93,41 +125,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
capability.setVirtualCores(1);
// Make container requests to ResourceManager
- for (int i = 0; i < n; ++i) {
- AMRMClient.ContainerRequest containerAsk =
- new AMRMClient.ContainerRequest(capability, null, null, priority);
-
+ for (int i = 0; i < numContainersToWaitFor; ++i) {
+ AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
+ System.out.println("[AM] Making res-req " + i);
rmClient.addContainerRequest(containerAsk);
}
- // Obtain allocated containers, launch and check for responses
- int responseId = 0;
- int completedContainers = 0;
- while (completedContainers < n) {
- AllocateResponse response = rmClient.allocate(responseId++);
- for (Container container : response.getAllocatedContainers()) {
- // Launch container by create ContainerLaunchContext
- ContainerLaunchContext ctx =
- Records.newRecord(ContainerLaunchContext.class);
-
- ctx.setCommands(
- Collections.singletonList(
- command +
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
- ));
-
- nmClient.startContainer(container, ctx);
- }
- for (ContainerStatus status : response.getCompletedContainersStatuses()) {
- ++completedContainers;
- System.out.println("Completed container " + status.getContainerId());
- }
+ System.out.println("[AM] waiting for containers to finish");
+ while (!doneWithContainers()) {
Thread.sleep(100);
}
+ System.out.println("[AM] unregisterApplicationMaster 0");
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(
- FinalApplicationStatus.SUCCEEDED, "", "");
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ System.out.println("[AM] unregisterApplicationMaster 1");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
index 7cef50d..e020ef4 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java
@@ -17,11 +17,24 @@
package org.apache.ignite.yarn;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.*;
import org.apache.hadoop.yarn.conf.*;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.*;
+import static org.apache.hadoop.yarn.api.ApplicationConstants.*;
+
/**
* Ignite yarn client.
*/
@@ -35,16 +48,109 @@ public class IgniteYarnClient {
* @param args Args.
*/
public static void main(String[] args) throws Exception {
- ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null);
-
- // Create yarnClient
YarnConfiguration conf = new YarnConfiguration();
-
YarnClient yarnClient = YarnClient.createYarnClient();
-
yarnClient.init(conf);
yarnClient.start();
+ // Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ amContainer.setCommands(
+ Collections.singletonList(
+ " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" +
+ " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr"
+ )
+ );
+
+ // Setup jar for ApplicationMaster
+ final LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf);
+
+ final LocalResource igniteZip = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf);
+
+ FileSystem fileSystem = FileSystem.get(conf);
+
+ Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh"));
+
+ System.out.println("Path: " + path);
+ System.out.println("Path URI: " + path.toUri().toString());
+
+ amContainer.setLocalResources(new HashMap<String, LocalResource>(){{
+ put("ignite-yarn.jar", appMasterJar);
+ put("ignite", igniteZip);
+ }});
+
+ // Setup CLASSPATH for ApplicationMaster
+ Map<String, String> appMasterEnv = new HashMap<String, String>();
+ setupAppMasterEnv(appMasterEnv, conf);
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(256);
+ capability.setVirtualCores(1);
+
+ // Finally, set-up ApplicationSubmissionContext for the application
+ ApplicationSubmissionContext appContext =
+ app.getApplicationSubmissionContext();
+ appContext.setApplicationName("simple-yarn-app"); // application name
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ appContext.setQueue("default"); // queue
+
+ // Submit application
+ ApplicationId appId = appContext.getApplicationId();
+ System.out.println("Submitting application " + appId);
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ while (appState != YarnApplicationState.FINISHED &&
+ appState != YarnApplicationState.KILLED &&
+ appState != YarnApplicationState.FAILED) {
+ Thread.sleep(100);
+ appReport = yarnClient.getApplicationReport(appId);
+ appState = appReport.getYarnApplicationState();
+ }
+
+ System.out.println(
+ "Application " + appId + " finished with" +
+ " state " + appState +
+ " at " + appReport.getFinishTime());
+ }
+
+ private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf)
+ throws Exception {
+ FileSystem fileSystem = FileSystem.get(conf);
+ jarPath = fileSystem.makeQualified(jarPath);
+
+ FileStatus jarStat = fileSystem.getFileStatus(jarPath);
+
+ appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ appMasterJar.setSize(jarStat.getLen());
+ appMasterJar.setTimestamp(jarStat.getModificationTime());
+ appMasterJar.setType(LocalResourceType.ARCHIVE);
+ appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION);
+
+ System.out.println("Path :" + jarPath);
+ }
+
+ private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
+ Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+ c.trim(), File.pathSeparator);
+
+ Apps.addToEnvironment(appMasterEnv,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + File.separator + "*",
+ File.pathSeparator);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
index 1a03743..04d3492 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java
@@ -23,5 +23,7 @@ import junit.framework.*;
* Scheduler tests.
*/
public class IgniteSchedulerSelfTest extends TestCase {
+ public void testName() throws Exception {
+ }
}