You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/10/25 08:27:25 UTC
[1/2] git commit: Added support for YARN - includes regression test +
"s4 yarn" command - also added pluggable mechanism for s4r fetchers - added
hdfs s4r fetcher
Updated Branches:
refs/heads/S4-25 [created] 4e78c3e46
Added support for YARN
- includes regression test + "s4 yarn" command
- also added pluggable mechanism for s4r fetchers
- added hdfs s4r fetcher
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/4e78c3e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/4e78c3e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/4e78c3e4
Branch: refs/heads/S4-25
Commit: 4e78c3e46ce7114f65d493297ce28314b46f4fb9
Parents: a7f86ac
Author: Matthieu Morel <mm...@apache.org>
Authored: Wed Oct 24 17:38:53 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Oct 25 10:09:55 2012 +0200
----------------------------------------------------------------------
build.gradle | 10 +-
lib/hadoop-common-tests-2.0.2-alpha.jar | Bin 0 -> 1175614 bytes
lib/hadoop-hdfs-tests-2.0.2-alpha.jar | Bin 0 -> 1520018 bytes
lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar | Bin 0 -> 39572 bytes
s4 | 24 +-
settings.gradle | 3 +-
subprojects/s4-core/s4-core.gradle | 13 +
.../java/org/apache/s4/core/DefaultCoreModule.java | 9 +
.../s4/deploy/DistributedDeploymentManager.java | 18 +-
.../org/apache/s4/deploy/FileSystemS4RFetcher.java | 12 +
.../java/org/apache/s4/deploy/HttpS4RFetcher.java | 6 +
.../main/java/org/apache/s4/deploy/S4RFetcher.java | 7 +
.../apache/s4/deploy/TestAutomaticDeployment.java | 8 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 40 +-
subprojects/s4-yarn/s4-yarn.gradle | 102 +++
.../org/apache/s4/deploy/HdfsFetcherModule.java | 18 +
.../java/org/apache/s4/deploy/HdfsS4RFetcher.java | 64 ++
.../apache/s4/tools/yarn/S4ApplicationMaster.java | 706 +++++++++++++++
.../org/apache/s4/tools/yarn/S4YarnClient.java | 480 ++++++++++
.../java/org/apache/s4/tools/yarn/YarnArgs.java | 78 ++
.../org/apache/s4/tools/yarn/package-info.java | 11 +
.../apache/s4/tools/yarn/TestYarnDeployment.java | 197 ++++
22 files changed, 1772 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 60577de..27c38ed 100644
--- a/build.gradle
+++ b/build.gradle
@@ -40,6 +40,8 @@ allprojects {
maven { url 'http://google-gson.googlecode.com/svn/mavenrepo' }
maven { url 'https://repo.springsource.org/libs-release' }
maven { url 'http://repo.gradle.org/gradle/libs-releases-local' }
+
+
/* Add lib dir as a repo. Some jar files that are not available
in a public repo are distributed in the lib dir. */
@@ -58,6 +60,7 @@ project.ext["libraries"] = [
guice: 'com.google.inject:guice:3.0',
aop_alliance: 'aopalliance:aopalliance:1.0',
guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
+ guice_multibindings:'com.google.inject.extensions:guice-multibindings:3.0',
kryo: 'com.googlecode:kryo:1.04',
minlog: 'com.googlecode:minlog:1.2',
reflectasm: 'com.googlecode:reflectasm:1.01',
@@ -86,7 +89,8 @@ project.ext["libraries"] = [
gradle_base_services: 'org.gradle:gradle-base-services:1.0',
gradle_core: 'org.gradle:gradle-core:1.0',
gradle_tooling_api: 'org.gradle:gradle-tooling-api:1.0',
- gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0'
+ gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0',
+
]
subprojects {
@@ -114,12 +118,13 @@ subprojects {
compile ( libraries.guava )
compile ( libraries.guice )
compile ( libraries.guice_assist)
+ compile ( libraries.guice_multibindings)
/* Logging. */
compile( libraries.slf4j )
compile( libraries.logback_core )
compile( libraries.logback_classic )
- runtime( libraries.commons_logging)
+ compile( libraries.commons_logging)
/* Commons. */
compile( libraries.commons_config )
@@ -146,6 +151,7 @@ subprojects {
runtime(libraries.asm)
runtime(libraries.javax_inject)
runtime(libraries.commons_codec)
+
}
jar {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-common-tests-2.0.2-alpha.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-common-tests-2.0.2-alpha.jar b/lib/hadoop-common-tests-2.0.2-alpha.jar
new file mode 100644
index 0000000..224b855
Binary files /dev/null and b/lib/hadoop-common-tests-2.0.2-alpha.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-hdfs-tests-2.0.2-alpha.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-hdfs-tests-2.0.2-alpha.jar b/lib/hadoop-hdfs-tests-2.0.2-alpha.jar
new file mode 100644
index 0000000..ff760bb
Binary files /dev/null and b/lib/hadoop-hdfs-tests-2.0.2-alpha.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar
----------------------------------------------------------------------
diff --git a/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar b/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar
new file mode 100644
index 0000000..cdd1dc5
Binary files /dev/null and b/lib/hadoop-yarn-server-tests-2.0.2-alpha-tests.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
index 6471486..6dd51af 100755
--- a/s4
+++ b/s4
@@ -1,14 +1,30 @@
#!/bin/bash
-# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+# NOTE1: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+# NOTE2: "./gradlew s4-yarn:installApp" will prepare/update the yarn deployment tools if needed
S4_DIR="$( cd -P "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
S4_SCRIPT_PATH="$S4_DIR/s4"
-# JVM options for starting nodes and for other s4 tools can be configured here
-# export JAVA_OPTS=-Xmx1G
+case "$1" in
+"yarn")
+ if [ ! -f subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn ]
+ then
+ echo "Cannot find s4 yarn script (subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn). Please install the script with './gradlew s4-yarn:installApp' "
+ exit 1
+ fi
+ shift 1
+ subprojects/s4-yarn/build/install/s4-yarn/bin/s4-yarn -s4Dir=$S4_DIR $@
+;;
-subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
+*)
+ # JVM options for starting nodes and for other s4 tools can be configured here
+ # export JAVA_OPTS=-Xmx1G
+
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools -s4ScriptPath=$S4_SCRIPT_PATH $@
+;;
+
+esac
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index c2b9b67..9e652bb 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -21,7 +21,8 @@ include 's4-core'
include 's4-comm'
include 's4-edsl'
include 's4-example'
-include 's4-tools'
+include 's4-tools'
+include 's4-yarn'
//include 's4-example'
//include ':test-apps:simple-adapter-1'
include ':test-apps:simple-deployable-app-1'
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 0905ce7..f66ac21 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -34,6 +34,19 @@ dependencies {
testCompile libraries.gradle_wrapper
testCompile libraries.mockito_core
}
+
+configurations {
+ tests
+}
+
+task testJar(type: Jar) {
+ baseName = "test-${project.archivesBaseName}"
+ from sourceSets.test.output
+}
+
+artifacts {
+ tests testJar
+}
test {
forkEvery=1;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..4c5bb86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -33,11 +33,15 @@ import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.ft.NoOpCheckpointingFramework;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.deploy.FileSystemS4RFetcher;
+import org.apache.s4.deploy.HttpS4RFetcher;
+import org.apache.s4.deploy.S4RFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
/**
@@ -78,6 +82,11 @@ public class DefaultCoreModule extends AbstractModule {
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+ // allow for pluggable s4r fetching strategies
+ Multibinder<S4RFetcher> s4rFetcherMultibinder = Multibinder.newSetBinder(binder(), S4RFetcher.class);
+ s4rFetcherMultibinder.addBinding().to(FileSystemS4RFetcher.class);
+ s4rFetcherMultibinder.addBinding().to(HttpS4RFetcher.class);
+
bind(S4RLoaderFactory.class);
// For enabling checkpointing, one needs to use a custom module, such as
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 9fc5c53..2951c76 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Set;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
@@ -78,14 +79,17 @@ public class DistributedDeploymentManager implements DeploymentManager {
private final Server server;
boolean deployed = false;
+ private final Set<S4RFetcher> s4rFetchers;
+
@Inject
public DistributedDeploymentManager(@Named("s4.cluster.name") String clusterName,
@Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, Set<S4RFetcher> s4rFetchers) {
this.clusterName = clusterName;
this.server = server;
+ this.s4rFetchers = s4rFetchers;
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -146,13 +150,13 @@ public class DistributedDeploymentManager implements DeploymentManager {
// NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
// but that's probably not that useful, and we can simply provide whichever protocol is needed
public InputStream fetchS4App(URI uri) throws DeploymentFailedException {
- String scheme = uri.getScheme();
- if ("file".equalsIgnoreCase(scheme)) {
- return new FileSystemS4RFetcher().fetch(uri);
- }
- if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
- return new HttpS4RFetcher().fetch(uri);
+
+ for (S4RFetcher s4rFetcher : s4rFetchers) {
+ if (s4rFetcher.handlesProtocol(uri)) {
+ return s4rFetcher.fetch(uri);
+ }
}
+ String scheme = uri.getScheme();
throw new DeploymentFailedException("Unsupported protocol " + scheme);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
index 8947998..bd0c55a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
@@ -24,18 +24,30 @@ import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Fetches S4R files from a file system, possibly distributed.
*
*/
public class FileSystemS4RFetcher implements S4RFetcher {
+ private static Logger logger = LoggerFactory.getLogger(FileSystemS4RFetcher.class);
+
+ @Override
+ public boolean handlesProtocol(URI uri) {
+ return "file".equalsIgnoreCase(uri.getScheme());
+ }
+
@Override
public InputStream fetch(URI uri) throws DeploymentFailedException {
+ logger.debug("Fetching uri through the file system : {}", uri.toString());
try {
return new FileInputStream(new File(uri));
} catch (FileNotFoundException e) {
throw new DeploymentFailedException("Cannot retrieve file from uri [" + uri.toString() + "]");
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
index 1bb4c01..a8873b6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HttpS4RFetcher.java
@@ -71,6 +71,11 @@ public class HttpS4RFetcher implements S4RFetcher {
private static Logger logger = LoggerFactory.getLogger(HttpS4RFetcher.class);
@Override
+ public boolean handlesProtocol(URI uri) {
+ return ("http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme()));
+ }
+
+ @Override
public InputStream fetch(URI uri) throws DeploymentFailedException {
logger.debug("Fetching file through http: {}", uri.toString());
@@ -183,4 +188,5 @@ public class HttpS4RFetcher implements S4RFetcher {
ByteStreams.copy(cbis, fos);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
index 9c6590f..69765fc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/S4RFetcher.java
@@ -29,6 +29,13 @@ import java.net.URI;
public interface S4RFetcher {
/**
+ *
+ * @param uri
+ * @return true if the uri's protocol is handled by this fetcher
+ */
+ boolean handlesProtocol(URI uri);
+
+ /**
* Returns a stream to an S4R archive file
*
* @param uri
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1fbd37a..9f5bc50 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -184,8 +184,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
@Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
- if (currentChilds.size() == 2) {
+ public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
+ if (currentChildren.size() == 1) {
signalProcessesReady.countDown();
}
@@ -195,8 +195,8 @@ public class TestAutomaticDeployment extends ZkBasedTest {
forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=cluster1" });
// TODO synchro with ready state from zk
- Thread.sleep(10000);
- // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+ // Thread.sleep(10000);
+ Assert.assertTrue(signalProcessesReady.await(30, TimeUnit.SECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 94e359c..30dfb0f 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -66,18 +66,21 @@ public class Deploy extends S4ArgsBase {
System.exit(1);
}
- File s4rToDeploy;
+ java.net.URI s4rURI = null;
if (deployArgs.s4rPath != null) {
- s4rToDeploy = new File(deployArgs.s4rPath);
- if (!s4rToDeploy.exists()) {
- logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
- System.exit(1);
- } else {
- logger.info(
- "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
- s4rToDeploy.getAbsolutePath());
+ s4rURI = new java.net.URI(deployArgs.s4rPath);
+ if (s4rURI.getScheme().equalsIgnoreCase("file")) {
+ File s4rToDeploy = new File(deployArgs.s4rPath);
+ if (!s4rToDeploy.exists()) {
+ logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
+ System.exit(1);
+ }
}
+ logger.info(
+ "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+ s4rURI.toString());
+
} else {
List<String> params = new ArrayList<String>();
// prepare gradle -P parameters, including passed gradle opts
@@ -89,21 +92,22 @@ public class Deploy extends S4ArgsBase {
File tmpS4R = new File(tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r");
if (!Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
logger.info("Copying generated S4R to [{}]", deployArgs.generatedS4R);
- s4rToDeploy = new File(deployArgs.generatedS4R);
+ File s4rToDeploy = new File(deployArgs.generatedS4R);
if (!(ByteStreams.copy(Files.newInputStreamSupplier(tmpS4R),
Files.newOutputStreamSupplier(s4rToDeploy)) > 0)) {
logger.error("Cannot copy generated s4r from {} to {}", tmpS4R.getAbsolutePath(),
s4rToDeploy.getAbsolutePath());
System.exit(1);
+ } else {
+ s4rURI = s4rToDeploy.toURI();
}
} else {
- s4rToDeploy = tmpS4R;
+ s4rURI = tmpS4R.toURI();
}
}
- final String uri = s4rToDeploy.toURI().toString();
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ record.putSimpleField(DistributedDeploymentManager.S4R_URI, s4rURI.toString());
record.putSimpleField("name", deployArgs.appName);
String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
if (zkClient.exists(deployedAppPath)) {
@@ -117,11 +121,12 @@ public class Deploy extends S4ArgsBase {
logger.info(
"uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
new String[] { deployArgs.appName, deployArgs.clusterName,
- "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
- s4rToDeploy.getAbsolutePath() });
+ "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName, s4rURI.toString() });
// Explicitly shutdown the JVM since Gradle leaves non-daemon threads running that delay the termination
- System.exit(0);
+ if (deployArgs.shutdown) {
+ System.exit(0);
+ }
} catch (Exception e) {
LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
}
@@ -155,6 +160,9 @@ public class Deploy extends S4ArgsBase {
@Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
int timeout = 10000;
+ @Parameter(names = "-shutdown", description = "Shutdown JVM after deployment. Useful to avoid waiting for remaining long running threads from Gradle", arity = 1)
+ boolean shutdown = true;
+
}
static class ExecGradle {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/s4-yarn.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/s4-yarn.gradle b/subprojects/s4-yarn/s4-yarn.gradle
new file mode 100644
index 0000000..215d93a
--- /dev/null
+++ b/subprojects/s4-yarn/s4-yarn.gradle
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+description = 'The S4 yarn tool'
+
+apply plugin: 'java'
+
+
+task "create-dirs" << {
+ sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
+ sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
+}
+
+configurations.all {
+ transitive = true
+ }
+
+
+dependencies {
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+ compile project(":s4-tools")
+ compile libraries.jcommander
+ compile libraries.gradle_base_services
+ compile libraries.gradle_core
+ compile libraries.gradle_tooling_api
+ compile libraries.gradle_wrapper
+ compile libraries.log4j
+ compile libraries.zkclient
+
+ testCompile project(path: ':s4-comm', configuration: 'tests')
+ testCompile project(path: ':s4-core', configuration: 'tests')
+
+ compile('commons-cli:commons-cli:1.2')
+ runtime('org.apache.hadoop:hadoop-auth:2.0.2-alpha') {transitive = true}
+ compile('org.apache.hadoop:hadoop-common:2.0.2-alpha') {transitive = true}
+ runtime('org.apache.hadoop:hadoop-hdfs:2.0.2-alpha') {transitive = true}
+ testRuntime('org.apache.hadoop:hadoop-yarn-server:2.0.2-alpha') {transitive = true}
+ compile('org.apache.hadoop:hadoop-yarn-client:2.0.2-alpha') {transitive = true}
+ testRuntime('org.apache.hadoop:hadoop-yarn-server-common:2.0.2-alpha') {transitive = true}
+ testRuntime('org.apache.hadoop:hadoop-yarn-server-nodemanager:2.0.2-alpha') {transitive = true}
+ testRuntime('org.apache.hadoop:hadoop-yarn-server-resourcemanager:2.0.2-alpha') {transitive = true}
+ testRuntime('org.apache.hadoop:hadoop-yarn-server-web-proxy:2.0.2-alpha') {transitive = true}
+ runtime('org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.0.2-alpha') {transitive = true}
+ // NOTE: the tests jars are available on maven but can't find a way to reference them) so we copied them locally with modified names
+ testCompile('org.apache.hadoop:hadoop-common-tests:2.0.2-alpha') {transitive = true}
+ testCompile('org.apache.hadoop:hadoop-hdfs-tests:2.0.2-alpha@jar') {transitive = true}
+ testCompile('org.apache.hadoop:hadoop-yarn-server-tests:2.0.2-alpha-tests') {transitive = true}
+ testRuntime('javax.servlet:servlet-api:2.5')
+ testRuntime('com.google.inject.extensions:guice-servlet:3.0') {transitive=true}
+ compile('org.apache.hadoop:hadoop-yarn-api:2.0.2-alpha') {transitive = true}
+ compile('org.apache.hadoop:hadoop-yarn-common:2.0.2-alpha') {transitive = true}
+ runtime('com.google.protobuf:protobuf-java:2.4.1')
+ // Problem of commons-daemon is that there is a dependency on 1.0.3 but that version is not available in maven repos.
+ // For that reason we explicitly specify version 1.0.10
+ compile('commons-daemon:commons-daemon:1.0.10') {transitive = true}
+ runtime('org.apache.zookeeper:zookeeper:3.3.1'){force=true}
+
+}
+
+apply plugin:'application'
+mainClassName = "org.apache.s4.tools.yarn.S4YarnClient"
+
+task copyTemplates << {
+ copy {
+ from 'src/main/java/hello'
+ into 'src/main/resources'
+ }
+}
+
+run {
+ // run doesn't yet directly accept command line parameters...
+ if ( project.hasProperty('args') ) {
+ args project.args.split('\\s+')
+ print args
+ }
+ }
+
+
+test {
+ forkEvery=1
+ // testLogging.showStandardStreams = true
+}
+
+test.dependsOn installApp
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
new file mode 100644
index 0000000..97e5758
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsFetcherModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.deploy;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
+
+/**
+ * This module plugs an HDFS S4R fetcher into the node configuration.
+ *
+ */
+public class HdfsFetcherModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ Multibinder<S4RFetcher> s4rFetcherMultibinder = Multibinder.newSetBinder(binder(), S4RFetcher.class);
+ s4rFetcherMultibinder.addBinding().to(HdfsS4RFetcher.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
new file mode 100644
index 0000000..1f5e5d2
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/deploy/HdfsS4RFetcher.java
@@ -0,0 +1,64 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Fetches S4R archives from HDFS, i.e. S4R published with a "hdfs://path/to/the/file" - like URI.
+ *
+ * The HDFS configuration is fetched from the classpath (typically, from core-site.xml).
+ *
+ */
+public class HdfsS4RFetcher implements S4RFetcher {
+
+ private static Logger logger = LoggerFactory.getLogger(HdfsS4RFetcher.class);
+
+ final Configuration conf;
+
+ // the fs name holder is optional and only used by tests. Default value is null
+ static class FsNameHolder {
+ @Inject(optional = true)
+ @Named(FileSystem.FS_DEFAULT_NAME_KEY)
+ String value = null;
+ }
+
+ @Inject
+ public HdfsS4RFetcher(FsNameHolder fsNameHolder) {
+ this.conf = new YarnConfiguration();
+ if (fsNameHolder.value != null) {
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsNameHolder.value);
+ }
+ }
+
+ @Override
+ public boolean handlesProtocol(URI uri) {
+ return "hdfs".equalsIgnoreCase(uri.getScheme());
+ }
+
+ @Override
+ public InputStream fetch(URI uri) throws DeploymentFailedException {
+ try {
+ logger.info("Fetching S4R through hdfs from uri {}", uri.toString());
+ FileSystem fs = FileSystem.get(conf);
+ Path s4rPath = new Path(uri);
+ if (!fs.exists(s4rPath)) {
+ fs.close();
+ throw new DeploymentFailedException("Cannot find S4R file at URI : " + uri.toString());
+ }
+ return fs.open(s4rPath);
+ } catch (IOException e) {
+ throw new DeploymentFailedException("Cannot fetch S4R file at URI: " + uri.toString(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
new file mode 100644
index 0000000..a3e8702
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4ApplicationMaster.java
@@ -0,0 +1,706 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.yarn;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.s4.deploy.HdfsFetcherModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * An ApplicationMaster for launching S4 nodes on a set of launched containers using the YARN framework.
+ *
+ * The code is inspired by the shell example from {@link http
+ * ://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html}
+ *
+ *
+ */
+public class S4ApplicationMaster {
+
+ private static Logger logger = LoggerFactory.getLogger(S4ApplicationMaster.class);
+
+ // Configuration
+ private Configuration conf;
+
+ // YARN RPC to communicate with the Resource Manager or Node Manager
+ private YarnRPC rpc;
+
+ // Handle to communicate with the Resource Manager
+ private AMRMProtocol resourceManager;
+
+ // Application Attempt Id ( combination of attemptId and fail count )
+ private ApplicationAttemptId appAttemptID;
+
+ // For status update for clients - yet to be implemented
+ // Hostname of the container
+ private String appMasterHostname = "";
+ // Port on which the app master listens for status update requests from clients
+ private int appMasterRpcPort = 0;
+ // Tracking url to which app master publishes info for clients to monitor
+ private String appMasterTrackingUrl = "";
+
+ // App Master configuration
+ // No. of containers to host S4 nodes on
+ private int numTotalContainers = 1;
+ // Memory to request for the container on which the S4 nodes will be hosted
+ private int containerMemory = 10;
+ // Priority of the request
+ private int requestPriority;
+
+ // Incremental counter for rpc calls to the RM
+ private AtomicInteger rmRequestID = new AtomicInteger();
+
+ // Simple flag to denote whether all works is done
+ private boolean appDone = false;
+ // Counter for completed containers ( complete denotes successful or failed )
+ private AtomicInteger numCompletedContainers = new AtomicInteger();
+ // Allocated container count so that we know how many containers has the RM
+ // allocated to us
+ private AtomicInteger numAllocatedContainers = new AtomicInteger();
+ // Count of failed containers
+ private AtomicInteger numFailedContainers = new AtomicInteger();
+ // Count of containers already requested from the RM
+ // Needed as once requested, we should not request for containers again and again.
+ // Only request for more if the original requirement changes.
+ private AtomicInteger numRequestedContainers = new AtomicInteger();
+
+ // The cluster (or application) name
+ private String cluster = "";
+
+ private String zkString = "";
+
+ // Containers to be released
+ private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
+
+ // Launch threads
+ private List<Thread> launchThreads = new ArrayList<Thread>();
+
+ private boolean testMode;
+
+ private String user;
+
+ /**
+ * @param args
+ * Command line args
+ */
+ public static void main(String[] args) {
+ boolean result = false;
+ try {
+
+ S4ApplicationMaster appMaster = new S4ApplicationMaster();
+ logger.info("Initializing ApplicationMaster with args " + Arrays.toString(args));
+ boolean doRun = appMaster.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ result = appMaster.run();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ logger.error("Error running ApplicationMaster", t);
+ System.exit(1);
+ }
+ if (result) {
+ logger.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ logger.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+
+ public S4ApplicationMaster() throws Exception {
+
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Uncaught exception in thread {}", t.getName(), e);
+
+ }
+ });
+ }
+
+ /**
+ * Parse command line options
+ *
+ * @param args
+ * Command line args
+ * @return Whether init successful and run should be invoked
+ * @throws ParseException
+ * @throws IOException
+ */
+ public boolean init(String[] args) throws ParseException, IOException {
+
+ Options opts = new Options();
+ opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+ opts.addOption("c", "cluster", true, "The cluster (or application) name");
+ opts.addOption("zk", true, "Zookeeper connection string");
+ opts.addOption("container_memory", true, "Amount of memory in MB to be requested to host the S4 node");
+ opts.addOption("num_containers", true, "No. of containers on which the S4 node needs to be hosted");
+ opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("debug", false, "Dump out debug information");
+ opts.addOption("test", false, "Test mode");
+
+ opts.addOption("help", false, "Print usage");
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (args.length == 0) {
+ printUsage(opts);
+ throw new IllegalArgumentException("No args specified for application master to initialize");
+ }
+
+ if (cliParser.hasOption("help")) {
+ printUsage(opts);
+ return false;
+ }
+
+ Map<String, String> envs = System.getenv();
+
+ appAttemptID = Records.newRecord(ApplicationAttemptId.class);
+ if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
+ if (cliParser.hasOption("app_attempt_id")) {
+ String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+ appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+ } else {
+ throw new IllegalArgumentException("Application Attempt Id not set in the environment");
+ }
+ } else {
+ ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
+ appAttemptID = containerId.getApplicationAttemptId();
+ }
+
+ logger.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+ + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+ + appAttemptID.getAttemptId());
+ logger.info("Application master args = " + Arrays.toString(cliParser.getArgs()));
+ for (Option option : cliParser.getOptions()) {
+ logger.info(option.getArgName() + " / " + option.getDescription() + " / " + option.getOpt() + " / "
+ + option.getValue());
+ }
+ logger.info("Application master args = " + Arrays.toString(cliParser.getOptions()));
+
+ if (!cliParser.hasOption("cluster")) {
+ throw new IllegalArgumentException("No cluster ID specified to be provisioned by application master");
+ }
+ cluster = cliParser.getOptionValue("cluster");
+ zkString = cliParser.getOptionValue("zk");
+
+ containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "128"));
+ numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+ requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+ user = cliParser.getOptionValue("user");
+
+ conf = new YarnConfiguration();
+ if (cliParser.hasOption("test")) {
+ testMode = true;
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+ }
+ rpc = YarnRPC.create(conf);
+
+ return true;
+ }
+
+ /**
+ * Helper function to print usage
+ *
+ * @param opts
+ * Parsed command line options
+ */
+ private void printUsage(Options opts) {
+ new HelpFormatter().printHelp("ApplicationMaster", opts);
+ }
+
+ /**
+ * Main run function for the application master
+ *
+ * @throws YarnRemoteException
+ */
+ public boolean run() throws YarnRemoteException {
+ logger.info("Starting ApplicationMaster");
+
+ // Connect to ResourceManager
+ resourceManager = connectToRM();
+
+ // Setup local RPC Server to accept status requests directly from clients
+
+ // Register self with ResourceManager
+ RegisterApplicationMasterResponse response = registerToRM();
+ // Dump out information about cluster capability as seen by the resource manager
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ logger.info("Min mem capability of resources in this cluster " + minMem);
+ logger.info("Max mem capability of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
+ // a multiple of the min value and cannot exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+ if (containerMemory < minMem) {
+ logger.info("Container memory specified below min threshold of cluster. Using min value." + ", specified="
+ + containerMemory + ", min=" + minMem);
+ containerMemory = minMem;
+ } else if (containerMemory > maxMem) {
+ logger.info("Container memory specified above max threshold of cluster. Using max value." + ", specified="
+ + containerMemory + ", max=" + maxMem);
+ containerMemory = maxMem;
+ }
+
+ int loopCounter = -1;
+
+ while (numCompletedContainers.get() < numTotalContainers && !appDone) {
+ loopCounter++;
+
+ // log current state
+ logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
+ + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+ + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+ + numAllocatedContainers);
+
+ // Sleep before each loop when asking RM for containers
+ // to avoid flooding RM with spurious requests when it
+ // need not have any available containers
+ // Sleeping for 1000 ms.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.info("Sleep interrupted " + e.getMessage());
+ }
+
+ // No. of containers to request
+ // For the first loop, askCount will be equal to total containers needed
+ // From that point on, askCount will always be 0 as current implementation
+ // does not change its ask on container failures.
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ // Setup request to be sent to RM to allocate containers
+ List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
+ if (askCount > 0) {
+ ResourceRequest containerAsk = setupContainerAskForRM(askCount);
+ resourceReq.add(containerAsk);
+ }
+
+ // Send the request to RM
+ logger.info("Asking RM for containers" + ", askCount=" + askCount);
+ AMResponse amResp = sendContainerAskToRM(resourceReq);
+
+ // Retrieve list of allocated containers from the response
+ List<Container> allocatedContainers = amResp.getAllocatedContainers();
+ logger.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ logger.info("Launching S4 node on a new container." + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
+ // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
+ }
+
+ // Check what the current available resources in the cluster are
+ // TODO should we do anything if the available resources are not enough?
+ Resource availableResources = amResp.getAvailableResources();
+ logger.info("Current available resources in the cluster " + availableResources);
+
+ // Check the completed containers
+ List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
+ logger.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ logger.info("Got container status for containerID= " + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
+ + ", diagnostics=" + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (-100 != exitStatus) {
+ // shell script failed
+ // counts as completed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // something else bad happened
+ // app job did not complete for some reason
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM/CM.
+ }
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ logger.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
+ }
+
+ }
+ if (numCompletedContainers.get() == numTotalContainers) {
+ appDone = true;
+ }
+
+ logger.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total="
+ + numTotalContainers + ", requested=" + numRequestedContainers + ", completed="
+ + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated="
+ + numAllocatedContainers);
+
+ }
+
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(0);
+ } catch (InterruptedException e) {
+ logger.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should send a finish application signal
+ // to the RM
+ logger.info("Application completed. Signalling finish to RM");
+
+ FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setAppAttemptId(appAttemptID);
+ boolean isSuccess = true;
+ if (numFailedContainers.get() == 0) {
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ } else {
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+ String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+ + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ finishReq.setDiagnostics(diagnostics);
+ isSuccess = false;
+ }
+ resourceManager.finishApplicationMaster(finishReq);
+ return isSuccess;
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManager} and launch the container that will execute the shell command.
+ */
+ private class LaunchContainerRunnable implements Runnable {
+
+ // Allocated container
+ Container container;
+ // Handle to communicate with ContainerManager
+ ContainerManager cm;
+
+ /**
+ * @param lcontainer
+ * Allocated container
+ */
+ public LaunchContainerRunnable(Container lcontainer) {
+ this.container = lcontainer;
+ }
+
+ /**
+ * Helper function to connect to CM
+ */
+ private void connectToCM() {
+ logger.debug("Connecting to ContainerManager for containerid=" + container.getId());
+ String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort();
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ logger.info("Connecting to ContainerManager at " + cmIpPortStr);
+ this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
+ }
+
+ @Override
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ public void run() {
+ // Connect to ContainerManager
+ connectToCM();
+
+ logger.info("Setting up container launch container for containerid=" + container.getId());
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ ctx.setContainerId(container.getId());
+ ctx.setResource(container.getResource());
+
+ try {
+ if (!Strings.isNullOrEmpty(user)) {
+ ctx.setUser(user);
+ } else {
+ logger.info("Using default user name {}", UserGroupInformation.getCurrentUser().getShortUserName());
+ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ }
+ } catch (IOException e) {
+ logger.info("Getting current user info failed when trying to launch the container" + e.getMessage());
+ }
+
+ // Set the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ try {
+ FileSystem fs = FileSystem.get(conf);
+
+ RemoteIterator<LocatedFileStatus> files = fs.listFiles(fs.getHomeDirectory(), false);
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+
+ localResource.setType(LocalResourceType.FILE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(file.getPath()));
+ localResource.setTimestamp(file.getModificationTime());
+ localResource.setSize(file.getLen());
+ localResources.put(file.getPath().getName(), localResource);
+ }
+ ctx.setLocalResources(localResources);
+
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ // classPathEnv.append(System.getProperty("java.class.path"));
+ Map<String, String> env = new HashMap<String, String>();
+
+ env.put("CLASSPATH", classPathEnv.toString());
+ ctx.setEnvironment(env);
+
+ // Set the necessary command to execute on the allocated container
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+ vargs.add("java");
+ // TODO add memory parameter
+ // vargs.add("-Xdebug");
+ // vargs.add("-Xrunjdwp:transport=dt_socket,address=8889,server=y");
+ vargs.add("org.apache.s4.core.Main");
+ vargs.add("-zk=" + zkString);
+ vargs.add("-c=" + cluster);
+
+ // add module for fetchings from hdfs
+ vargs.add("-emc=" + HdfsFetcherModule.class.getName());
+
+ // add reference to the configuration
+ if (testMode) {
+ vargs.add("-p=" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ }
+
+ // TODO
+ // We should redirect the output to hdfs instead of local logs
+ // so as to be able to look at the final output after the containers
+ // have been released.
+ // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ ctx.setCommands(commands);
+
+ StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+ startReq.setContainerLaunchContext(ctx);
+ try {
+ cm.startContainer(startReq);
+ } catch (YarnRemoteException e) {
+ logger.info("Start container failed for :" + ", containerId=" + container.getId());
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ /**
+ * Connect to the Resource Manager
+ *
+ * @return Handle to communicate with the RM
+ */
+ private AMRMProtocol connectToRM() {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ InetSocketAddress rmAddress = yarnConf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ logger.info("Connecting to ResourceManager at " + rmAddress);
+ return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
+ }
+
+ /**
+ * Register the Application Master to the Resource Manager
+ *
+ * @return the registration response from the RM
+ * @throws YarnRemoteException
+ */
+ private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
+ RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+
+ // set the required info into the registration request:
+ // application attempt id,
+ // host on which the app master is running
+ // rpc port on which the app master accepts requests from the client
+ // tracking url for the app master
+ appMasterRequest.setApplicationAttemptId(appAttemptID);
+ appMasterRequest.setHost(appMasterHostname);
+ appMasterRequest.setRpcPort(appMasterRpcPort);
+ appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+
+ return resourceManager.registerApplicationMaster(appMasterRequest);
+ }
+
+ /**
+ * Setup the request that will be sent to the RM for the container ask.
+ *
+ * @param numContainers
+ * Containers to ask for from RM
+ * @return the setup ResourceRequest to be sent to RM
+ */
+ private ResourceRequest setupContainerAskForRM(int numContainers) {
+ ResourceRequest request = Records.newRecord(ResourceRequest.class);
+
+ // setup requirements for hosts
+ // whether a particular rack/host is needed
+ // Refer to apis under org.apache.hadoop.net for more
+ // details on how to get figure out rack/host mapping.
+ // using * as any host will do for the distributed shell app
+ request.setHostName("*");
+
+ // set no. of containers needed
+ request.setNumContainers(numContainers);
+
+ // set the priority for the request
+ Priority pri = Records.newRecord(Priority.class);
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(requestPriority);
+ request.setPriority(pri);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(containerMemory);
+ request.setCapability(capability);
+
+ return request;
+ }
+
+ /**
+ * Ask RM to allocate given no. of containers to this Application Master
+ *
+ * @param requestedContainers
+ * Containers to ask for from RM
+ * @return Response from RM to AM with allocated containers
+ * @throws YarnRemoteException
+ */
+ private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) throws YarnRemoteException {
+ AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ req.setResponseId(rmRequestID.incrementAndGet());
+ req.setApplicationAttemptId(appAttemptID);
+ req.addAllAsks(requestedContainers);
+ req.addAllReleases(releasedContainers);
+ req.setProgress((float) numCompletedContainers.get() / numTotalContainers);
+
+ logger.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size()
+ + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress());
+
+ for (ResourceRequest rsrcReq : requestedContainers) {
+ logger.info("Requested container ask: " + rsrcReq.toString());
+ }
+ for (ContainerId id : releasedContainers) {
+ logger.info("Released container, id=" + id.getId());
+ }
+
+ AllocateResponse resp = resourceManager.allocate(req);
+ return resp.getAMResponse();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4e78c3e4/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
new file mode 100644
index 0000000..3a3aac6
--- /dev/null
+++ b/subprojects/s4-yarn/src/main/java/org/apache/s4/tools/yarn/S4YarnClient.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.tools.yarn;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.s4.tools.DefineCluster;
+import org.apache.s4.tools.Deploy;
+import org.apache.s4.tools.Tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Client for S4 application submission to YARN.
+ *
+ * <p>
+ * It connects to an existing YARN infrastructure (based on configuration files in HADOOP_CONF_DIR environment
+ * variables), then launches an S4 application master, passing all relevant configuration. The application master then
+ * asks resources from the resource manager and creates S4 nodes on those allocated resources, with the appropriate
+ * configuration.
+ * </p>
+ *
+ * <p>
+ * <u>Implementation notes from YARN example: </u>
+ * </p>
+ * <p>
+ * To submit an application, a client first needs to connect to the <code>ResourceManager</code> aka ApplicationsManager
+ * or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} provides a way for the client to get access to
+ * cluster information and to request for a new {@link ApplicationId}.
+ * <p>
+ *
+ * <p>
+ * For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. The
+ * {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} and application
+ * name, user submitting the application, the priority assigned to the application and the queue to which this
+ * application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} also defines the
+ * {@link ContainerLaunchContext} which describes the <code>Container</code> with which the {@link S4ApplicationMaster}
+ * is launched.
+ * </p>
+ *
+ * <p>
+ * The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the
+ * {@link S4ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available and the
+ * environment to be set for the {@link S4ApplicationMaster} and the commands to be executed to run the
+ * {@link S4ApplicationMaster}.
+ * <p>
+ *
+ * <p>
+ * Using the {@link ApplicationSubmissionContext}, the client submits the application to the
+ * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> for an
+ * {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client kills the
+ * application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>.
+ * </p>
+ *
+ */
+public class S4YarnClient extends YarnClientImpl {
+
+ private static final String HADOOP_CONF_DIR_ENV = System.getenv("HADOOP_CONF_DIR");
+
+ private static final ImmutableSet<String> YARN_CONF_FILES = ImmutableSet.of("core-site.xml", "hdfs-site.xml",
+ "yarn-site.xml", "mapred-site.xml");
+
+ private static Logger logger = LoggerFactory.getLogger(S4YarnClient.class);
+
+ // Configuration
+ private Configuration conf;
+
+ YarnArgs yarnArgs;
+
+ // Handle to talk to the Resource Manager/Applications Manager
+ private ClientRMProtocol applicationsManager;
+
+ private int amMemory;
+
+ private final long clientStartTime = System.currentTimeMillis();
+
+ /**
+ * @param args
+ * Command line arguments
+ */
+ public static void main(String[] args) {
+
+ YarnArgs yarnArgs = new YarnArgs();
+ logger.info("S4YarnClient args = " + Arrays.toString(args));
+
+ Tools.parseArgs(yarnArgs, args);
+ boolean result = false;
+ try {
+
+ YarnConfiguration yarnConfig = new YarnConfiguration();
+
+ if (Strings.isNullOrEmpty(HADOOP_CONF_DIR_ENV)) {
+ logger.error("You must define HADOOP_CONF_DIR environment variable");
+ System.exit(1);
+ }
+ File confDir = new File(HADOOP_CONF_DIR_ENV);
+ if (!(confDir.listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File pathname) {
+ return YARN_CONF_FILES.contains(pathname.getName());
+ }
+ }).length == 4)) {
+ logger.error("The {} directory must contain files [core,hdfs,yarn,mapred]-site.xml");
+ System.exit(1);
+ }
+
+ for (String fileName : YARN_CONF_FILES) {
+ yarnConfig.addResource(new Path(new File(HADOOP_CONF_DIR_ENV, fileName).toURI()));
+ }
+
+ S4YarnClient client = new S4YarnClient(yarnArgs, yarnConfig);
+ result = client.run(false);
+ } catch (Throwable t) {
+ logger.error("Error running Client", t);
+ System.exit(1);
+ }
+ if (result) {
+ logger.info("Application completed successfully");
+ System.exit(0);
+ }
+ logger.error("Application failed to complete successfully");
+ System.exit(1);
+ }
+
+ public S4YarnClient(YarnArgs yarnArgs, Configuration conf) throws Exception {
+ this.yarnArgs = yarnArgs;
+ this.conf = conf;
+ init(this.conf);
+ }
+
+ /**
+ * Main run function for the client
+ *
+ * @return true if application completed successfully
+ * @throws IOException
+ */
+ public boolean run(boolean testMode) throws IOException {
+ logger.info("Running Client");
+ start();
+
+ YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
+ logger.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
+
+ List<NodeReport> clusterNodeReports = super.getNodeReports();
+ logger.info("Got Cluster node info from ASM");
+ for (NodeReport node : clusterNodeReports) {
+ logger.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress"
+ + node.getHttpAddress() + ", nodeRackName" + node.getRackName() + ", nodeNumContainers"
+ + node.getNumContainers() + ", nodeHealthStatus" + node.getNodeHealthStatus());
+ }
+
+ QueueInfo queueInfo = super.getQueueInfo(yarnArgs.queue);
+ logger.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity="
+ + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ + ", queueApplicationCount=" + queueInfo.getApplications().size() + ", queueChildQueueCount="
+ + queueInfo.getChildQueues().size());
+
+ List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
+ for (QueueUserACLInfo aclInfo : listAclInfo) {
+ for (QueueACL userAcl : aclInfo.getUserAcls()) {
+ logger.info("User ACL Info for Queue" + ", queueName=" + aclInfo.getQueueName() + ", userAcl="
+ + userAcl.name());
+ }
+ }
+
+ // Get a new application id
+ GetNewApplicationResponse newApp = super.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ // TODO get min/max resource capabilities from RM and change memory ask if needed
+ // If we do not have min/max, we may not be able to correctly request
+ // the required resources from the RM for the app master
+ // Memory ask has to be a multiple of min and less than max.
+ // Dump out information about cluster capability as seen by the resource manager
+ int minMem = newApp.getMinimumResourceCapability().getMemory();
+ int maxMem = newApp.getMaximumResourceCapability().getMemory();
+ logger.info("Min mem capability of resources in this cluster " + minMem);
+ logger.info("Max mem capability of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
+ // a multiple of the min value and cannot exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+ if (amMemory < minMem) {
+ logger.info("AM memory specified below min threshold of cluster. Using min value." + ", specified="
+ + amMemory + ", min=" + minMem);
+ amMemory = minMem;
+ } else if (amMemory > maxMem) {
+ logger.info("AM memory specified above max threshold of cluster. Using max value." + ", specified="
+ + amMemory + ", max=" + maxMem);
+ amMemory = maxMem;
+ }
+
+ // Create launch context for app master
+ logger.info("Setting up application submission context for ASM");
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName(yarnArgs.appName);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ // set local resources for the application master
+ // local files or archives as needed
+ // In this scenario, the jar file for the application master is part of the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ if (!(new File(yarnArgs.s4Dir).isDirectory())) {
+ logger.error("Invalid s4 directory : " + yarnArgs.s4Dir);
+ System.exit(1);
+ }
+
+ // TODO avoid depending on source distribution paths
+ File[] classPathFiles = new File(yarnArgs.s4Dir + "/subprojects/s4-yarn/build/install/s4-yarn/lib").listFiles();
+ FileSystem fs = FileSystem.get(conf);
+
+ for (int i = 0; i < classPathFiles.length; i++) {
+ Path dest = copyToLocalResources(fs, localResources, classPathFiles[i]);
+ logger.info("Copied classpath resource " + classPathFiles[i].getAbsolutePath() + " to "
+ + dest.toUri().toString());
+ }
+
+ // Set local resource info into app master container launch context
+ amContainer.setLocalResources(localResources);
+
+ // Set the necessary security tokens as needed
+ // amContainer.setContainerTokens(containerToken);
+
+ // Set the env variables to be setup in the env where the application master will be run
+ logger.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(":./log4j.properties");
+
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ logger.info("Setting up app master command");
+
+ // vargs.add("${JAVA_HOME}" + "/bin/java");
+ vargs.add("java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx" + amMemory + "m");
+ // vargs.add("-Xdebug");
+ // vargs.add("-Xrunjdwp:transport=dt_socket,address=8888,server=y");
+ // Set Application Master class name
+ vargs.add(S4ApplicationMaster.class.getName());
+ // Set params for Application Master
+ vargs.add("--container_memory " + String.valueOf(yarnArgs.containerMemory));
+ vargs.add("--num_containers " + String.valueOf(yarnArgs.numContainers));
+ vargs.add("--priority " + String.valueOf(yarnArgs.priority));
+ vargs.add("-c " + String.valueOf(yarnArgs.cluster));
+ vargs.add("-zk " + String.valueOf(yarnArgs.zkString));
+ if (testMode) {
+ vargs.add("-test");
+ }
+
+ if (yarnArgs.debug) {
+ vargs.add("--debug");
+ }
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ logger.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(amMemory);
+ amContainer.setResource(capability);
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(yarnArgs.priority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(yarnArgs.queue);
+ // Set the user submitting this application
+ // TODO can it be empty?
+ appContext.setUser(yarnArgs.user);
+
+ // Define a new cluster for the application
+ String[] defineClusterArgs = { "-cluster=" + yarnArgs.cluster, "-nbTasks=" + yarnArgs.nbTasks,
+ "-flp=" + yarnArgs.flp, "-zk=" + yarnArgs.zkString };
+ DefineCluster.main(defineClusterArgs);
+
+ // Deply the application to the new cluster
+ String[] deployApplicationArgs = { "-s4r=" + yarnArgs.s4rPath, "-cluster=" + yarnArgs.cluster,
+ "-appName=" + yarnArgs.cluster, "-shutdown=false", "-zk=" + yarnArgs.zkString };
+ Deploy.main(deployApplicationArgs);
+
+ logger.info("Submitting application to ASM");
+ super.submitApplication(appContext);
+
+ // TODO
+ // Try submitting the same request again
+ // app submission failure?
+
+ // Monitor the application (TODO: optional?)
+
+ return monitorApplication(appId);
+
+ }
+
+ private Path copyToLocalResources(FileSystem fs, Map<String, LocalResource> localResources, File file)
+ throws IOException {
+ Path src = new Path(file.getAbsolutePath());
+
+ // TODO use home directory + appId / appName?
+ Path dst = new Path(fs.getHomeDirectory(), file.getName());
+ fs.copyFromLocalFile(false, true, src, dst);
+ FileStatus destStatus = fs.getFileStatus(dst);
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ resource.setType(LocalResourceType.FILE);
+ resource.setVisibility(LocalResourceVisibility.APPLICATION);
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ // Set timestamp and length of file so that the framework
+ // can do basic sanity checks for the local resource
+ // after it has been copied over to ensure it is the same
+ // resource the client intended to use with the application
+ resource.setTimestamp(destStatus.getModificationTime());
+ resource.setSize(destStatus.getLen());
+ localResources.put(file.getName(), resource);
+ return dst;
+ }
+
+ /**
+ * Monitor the submitted application for completion. Kill application if time expires.
+ *
+ * @param appId
+ * Application Id of application to be monitored
+ * @return true if application completed successfully
+ * @throws YarnRemoteException
+ */
+ private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
+
+ while (true) {
+
+ // Check app status every 1 second.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.debug("Thread sleep in monitoring loop interrupted");
+ }
+
+ // Get application report for the appId we are interested in
+ ApplicationReport report = super.getApplicationReport(appId);
+
+ logger.info("Got application report from ASM for" + ", appId=" + appId.getId() + ", clientToken="
+ + report.getClientToken() + ", appDiagnostics=" + report.getDiagnostics() + ", appMasterHost="
+ + report.getHost() + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+ + report.getRpcPort() + ", appStartTime=" + report.getStartTime() + ", yarnAppState="
+ + report.getYarnApplicationState().toString() + ", distributedFinalState="
+ + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + report.getTrackingUrl()
+ + ", appUser=" + report.getUser());
+
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ logger.info("Application has completed successfully. Breaking monitoring loop");
+ return true;
+ } else {
+ logger.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+ } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+ logger.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus="
+ + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+
+ if ((yarnArgs.timeout != -1) && (System.currentTimeMillis() > (clientStartTime + yarnArgs.timeout))) {
+ logger.info("Reached client specified timeout for application. Killing application");
+ forceKillApplication(appId);
+ return false;
+ }
+ }
+
+ }
+
+ /**
+ * Kill a submitted application by sending a call to the Applications Manager
+ *
+ * @param appId
+ * Application Id to be killed.
+ * @throws YarnRemoteException
+ */
+ private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
+ super.killApplication(appId);
+ }
+
+}