You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:01 UTC
[18/50] [abbrv] git commit: Moving packages around
Moving packages around
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8992aa5a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8992aa5a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8992aa5a
Branch: refs/heads/master
Commit: 8992aa5a7e7bd39b63aa79ea96a9fe316f833014
Parents: b47e329
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Feb 22 11:50:07 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Feb 22 11:50:07 2014 -0800
----------------------------------------------------------------------
.../controller/provisioner/ContainerState.java | 3 +-
helix-provisioning/pom.xml | 2 +-
.../apache/helix/provisioning/AppConfig.java | 17 +
.../helix/provisioning/ApplicationSpec.java | 29 +
.../provisioning/ApplicationSpecFactory.java | 9 +
.../provisioning/ContainerAskResponse.java | 17 +
.../provisioning/ContainerLaunchResponse.java | 5 +
.../provisioning/ContainerReleaseResponse.java | 5 +
.../provisioning/ContainerStopResponse.java | 5 +
.../helix/provisioning/HelixYarnUtil.java | 42 +
.../helix/provisioning/ParticipantLauncher.java | 136 +++
.../helix/provisioning/ServiceConfig.java | 17 +
.../apache/helix/provisioning/TaskConfig.java | 13 +
.../helix/provisioning/yarn/AppConfig.java | 17 -
.../helix/provisioning/yarn/AppLauncher.java | 22 +-
.../provisioning/yarn/AppMasterLauncher.java | 166 ++++
.../yarn/AppStatusReportGenerator.java | 79 ++
.../provisioning/yarn/ApplicationMaster.java | 889 -------------------
.../provisioning/yarn/ApplicationSpec.java | 28 -
.../yarn/ApplicationSpecFactory.java | 9 -
.../apache/helix/provisioning/yarn/Client.java | 627 -------------
.../provisioning/yarn/ContainerAskResponse.java | 17 -
.../yarn/ContainerLaunchResponse.java | 5 -
.../yarn/ContainerReleaseResponse.java | 5 -
.../yarn/ContainerStopResponse.java | 5 -
.../helix/provisioning/yarn/DSConstants.java | 47 -
.../provisioning/yarn/FixedTargetProvider.java | 20 +
.../yarn/GenericApplicationMaster.java | 4 +
.../yarn/HelixYarnApplicationMasterMain.java | 159 ----
.../helix/provisioning/yarn/HelixYarnUtil.java | 42 -
.../provisioning/yarn/NMCallbackHandler.java | 2 +
.../provisioning/yarn/ParticipantLauncher.java | 135 ---
.../provisioning/yarn/RMCallbackHandler.java | 3 +
.../helix/provisioning/yarn/ServiceConfig.java | 17 -
.../helix/provisioning/yarn/TaskConfig.java | 13 -
.../provisioning/yarn/YarnProvisioner.java | 6 +
recipes/helloworld-provisioning-yarn/pom.xml | 159 ++++
recipes/helloworld-provisioning-yarn/run.sh | 6 +
.../src/assemble/assembly.xml | 60 ++
.../src/main/config/log4j.properties | 31 +
.../yarn/example/HelloWordAppSpecFactory.java | 92 ++
.../yarn/example/HelloWorldService.java | 41 +
.../yarn/example/HelloWorldStateModel.java | 33 +
.../example/HelloWorldStateModelFactory.java | 13 +
.../yarn/example/HelloworldAppSpec.java | 138 +++
.../main/resources/hello_world_app_spec.yaml | 24 +
.../src/test/conf/testng.xml | 27 +
recipes/pom.xml | 2 +-
recipes/provisioning/pom.xml | 50 --
recipes/provisioning/yarn/helloworld/pom.xml | 159 ----
.../yarn/helloworld/src/assemble/assembly.xml | 60 --
.../helloworld/src/main/config/log4j.properties | 31 -
.../yarn/example/HelloWordAppSpecFactory.java | 92 --
.../yarn/example/HelloWorldService.java | 41 -
.../yarn/example/HelloWorldStateModel.java | 29 -
.../example/HelloWorldStateModelFactory.java | 13 -
.../yarn/example/HelloworldAppSpec.java | 138 ---
.../main/resources/hello_world_app_spec.yaml | 24 -
.../yarn/helloworld/src/test/conf/testng.xml | 27 -
recipes/provisioning/yarn/pom.xml | 50 --
60 files changed, 1222 insertions(+), 2735 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index 449f636..2f91275 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -29,5 +29,6 @@ public enum ContainerState {
HALTED,
FINALIZING,
FINALIZED,
- FAILED
+ FAILED,
+ UNDEFINED
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index ea5a0fe..3ba7d39 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -36,7 +36,7 @@ under the License.
org.apache.log4j,
*
</osgi.import>
- <osgi.export>org.apache.helix.provisioning.yarn*;version="${project.version};-noimport:=true</osgi.export>
+ <osgi.export>org.apache.helix.provisioning*;version="${project.version};-noimport:=true</osgi.export>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
new file mode 100644
index 0000000..a51db1c
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AppConfig {
+ public Map<String, String> config = new HashMap<String, String>();
+
+ public String getValue(String key) {
+ return (config != null ? config.get(key) : null);
+ }
+
+ public void setValue(String key, String value){
+ config.put(key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
new file mode 100644
index 0000000..f7454d2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
@@ -0,0 +1,29 @@
+package org.apache.helix.provisioning;
+
+import java.net.URI;
+import java.util.List;
+
+
+
+public interface ApplicationSpec {
+ /**
+ * Returns the name of the application
+ * @return
+ */
+ String getAppName();
+
+ AppConfig getConfig();
+
+ List<String> getServices();
+
+ URI getAppMasterPackage();
+
+ URI getServicePackage(String serviceName);
+
+ String getServiceMainClass(String service);
+
+ ServiceConfig getServiceConfig(String serviceName);
+
+ List<TaskConfig> getTaskConfigs();
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
new file mode 100644
index 0000000..0c524f2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
@@ -0,0 +1,9 @@
+package org.apache.helix.provisioning;
+
+import java.io.InputStream;
+
+public interface ApplicationSpecFactory {
+
+ ApplicationSpec fromYaml(InputStream yamlFile);
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
new file mode 100644
index 0000000..18f66d2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class ContainerAskResponse {
+
+ Container container;
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public void setContainer(Container container) {
+ this.container = container;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
new file mode 100644
index 0000000..ea6ef12
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerLaunchResponse {
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
new file mode 100644
index 0000000..e4a5dc4
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerReleaseResponse {
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
new file mode 100644
index 0000000..d8c8a46
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerStopResponse {
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
new file mode 100644
index 0000000..80ac16b
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
@@ -0,0 +1,42 @@
+package org.apache.helix.provisioning;
+
+import org.apache.log4j.Logger;
+
+public class HelixYarnUtil {
+ private static Logger LOG = Logger.getLogger(HelixYarnUtil.class);
+
+ @SuppressWarnings("unchecked")
+ public static <T extends ApplicationSpecFactory> T createInstance(String className) {
+ Class<ApplicationSpecFactory> factoryClazz = null;
+ {
+ try {
+ factoryClazz =
+ (Class<ApplicationSpecFactory>) Thread.currentThread().getContextClassLoader()
+ .loadClass(className);
+ } catch (ClassNotFoundException e) {
+ try {
+ factoryClazz =
+ (Class<ApplicationSpecFactory>) ClassLoader.getSystemClassLoader().loadClass(
+ className);
+ } catch (ClassNotFoundException e1) {
+ try {
+ factoryClazz = (Class<ApplicationSpecFactory>) Class.forName(className);
+ } catch (ClassNotFoundException e2) {
+
+ }
+ }
+ }
+ }
+ System.out.println(System.getProperty("java.class.path"));
+ if (factoryClazz == null) {
+ LOG.error("Unable to find class:" + className);
+ }
+ ApplicationSpecFactory factory = null;
+ try {
+ factory = factoryClazz.newInstance();
+ } catch (Exception e) {
+ LOG.error("Unable to create instance of class: " + className, e);
+ }
+ return (T) factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
new file mode 100644
index 0000000..55bb618
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
@@ -0,0 +1,136 @@
+package org.apache.helix.provisioning;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.AbstractParticipantService;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+/**
+ * Main class that invokes the Participant Api
+ */
+public class ParticipantLauncher {
+ private static Logger LOG = Logger.getLogger(ParticipantLauncher.class);
+
+ public static void main(String[] args) {
+
+ System.out.println("Starting Helix Participant: " + Arrays.toString(args));
+ Options opts;
+ opts = new Options();
+ opts.addOption("cluster", true, "Cluster name, default app name");
+ opts.addOption("participantId", true, "Participant Id");
+ opts.addOption("zkAddress", true, "Zookeeper address");
+ opts.addOption("participantClass", true, "Participant service class");
+ try {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String zkAddress = cliParser.getOptionValue("zkAddress");
+ final HelixConnection connection = new ZkHelixConnection(zkAddress);
+ connection.connect();
+ ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
+ ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
+ String participantClass = cliParser.getOptionValue("participantClass");
+ @SuppressWarnings("unchecked")
+ Class<? extends AbstractParticipantService> clazz =
+ (Class<? extends AbstractParticipantService>) Class.forName(participantClass);
+ final AbstractParticipantService containerParticipant =
+ clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class)
+ .newInstance(connection, clusterId, participantId);
+ containerParticipant.startAsync();
+ containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+ containerParticipant
+ .getParticipant()
+ .getMessagingService()
+ .registerMessageHandlerFactory(MessageType.SHUTDOWN.toString(),
+ new ShutdownMessageHandlerFactory(containerParticipant, connection));
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Received a shutdown signal. Stopping participant");
+ containerParticipant.stopAsync();
+ containerParticipant.awaitTerminated();
+ connection.disconnect();
+ }
+ }) {
+
+ });
+ Thread.currentThread().join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Failed to start Helix participant" + e);
+ // System.exit(1);
+ }
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static class ShutdownMessageHandlerFactory implements MessageHandlerFactory {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandlerFactory(AbstractParticipantService service,
+ HelixConnection connection) {
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
+ return new ShutdownMessageHandler(_service, _connection, message, context);
+ }
+
+ @Override
+ public String getMessageType() {
+ return MessageType.SHUTDOWN.toString();
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ }
+
+ public static class ShutdownMessageHandler extends MessageHandler {
+ private final AbstractParticipantService _service;
+ private final HelixConnection _connection;
+
+ public ShutdownMessageHandler(AbstractParticipantService service, HelixConnection connection,
+ Message message, NotificationContext context) {
+ super(message, context);
+ _service = service;
+ _connection = connection;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ LOG.info("Received a shutdown message. Trying to shut down.");
+ _service.stopAsync();
+ _service.awaitTerminated();
+ _connection.disconnect();
+ LOG.info("Shutdown complete. Process exiting gracefully");
+ System.exit(0);
+ return null;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
new file mode 100644
index 0000000..262344b
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ResourceId;
+
+public class ServiceConfig extends UserConfig{
+ public Map<String, String> config = new HashMap<String, String>();
+
+ public ServiceConfig(Scope<ResourceId> scope) {
+ super(scope);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
new file mode 100644
index 0000000..42203e9
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@ -0,0 +1,13 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class TaskConfig {
+ public Map<String, String> config = new HashMap<String, String>();
+
+ public String getValue(String key) {
+ return (config != null ? config.get(key) : null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
deleted file mode 100644
index 7ea3e42..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class AppConfig {
- public Map<String, String> config = new HashMap<String, String>();
-
- public String getValue(String key) {
- return (config != null ? config.get(key) : null);
- }
-
- public void setValue(String key, String value){
- config.put(key, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 1fe0a28..d2e901f 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -44,6 +44,12 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
/**
* Main class to launch the job.
@@ -52,7 +58,7 @@ import org.apache.hadoop.yarn.util.Records;
*/
public class AppLauncher {
- private static final Log LOG = LogFactory.getLog(Client.class);
+ private static final Log LOG = LogFactory.getLog(AppLauncher.class);
private ApplicationSpec _applicationSpec;
private YarnClient yarnClient;
@@ -197,7 +203,7 @@ public class AppLauncher {
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
- vargs.add(HelixYarnApplicationMasterMain.class.getCanonicalName());
+ vargs.add(AppMasterLauncher.class.getCanonicalName());
// Set params for Application Master
// vargs.add("--num_containers " + String.valueOf(numContainers));
@@ -375,7 +381,17 @@ public class AppLauncher {
return false;
}
if (YarnApplicationState.RUNNING == state) {
-
+ HelixConnection connection = new ZkHelixConnection(report.getHost() + ":2181");
+ try{
+ connection.connect();
+ }catch(Exception e){
+ LOG.warn("AppMaster started but not yet initialized");
+ }
+ if(connection.isConnected()){
+ AppStatusReportGenerator generator = new AppStatusReportGenerator();
+ String generateReport = generator.generateReport(connection, ClusterId.from(_applicationSpec.getAppName()));
+ LOG.info(generateReport);
+ }
}
prevReport = reportMessage;
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
new file mode 100644
index 0000000..72d6ea9
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -0,0 +1,166 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.helix.HelixController;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+/**
+ * This will <br/>
+ * <ul>
+ * <li>start zookeeper automatically</li>
+ * <li>create the cluster</li>
+ * <li>set up resource(s)</li>
+ * <li>start helix controller</li>
+ * </ul>
+ */
+public class AppMasterLauncher {
+ public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
+
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) throws Exception{
+ Map<String, String> env = System.getenv();
+ LOG.info("Starting app master with the following environment variables");
+ for (String key : env.keySet()) {
+ LOG.info(key + "\t\t=" + env.get(key));
+ }
+
+ Options opts;
+ opts = new Options();
+ opts.addOption("num_containers", true, "Number of containers");
+ try {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ } catch (Exception e) {
+ LOG.error("Error parsing input arguments" + Arrays.toString(args), e);
+ }
+
+ // START ZOOKEEPER
+ String dataDir = "dataDir";
+ String logDir = "logDir";
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+ try {
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
+ server.start();
+
+ // start Generic AppMaster that interacts with Yarn RM
+ AppMasterConfig appMasterConfig = new AppMasterConfig();
+ String containerIdStr = appMasterConfig.getContainerId();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+
+ String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
+ String className = appMasterConfig.getApplicationSpecFactory();
+
+ GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
+ try {
+ genericApplicationMaster.start();
+ } catch (Exception e) {
+ LOG.error("Unable to start application master: ", e);
+ }
+ ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
+
+ //TODO: Avoid setting static variable.
+ YarnProvisioner.applicationMaster = genericApplicationMaster;
+ YarnProvisioner.applicationMasterConfig = appMasterConfig;
+ ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+ YarnProvisioner.applicationSpec = applicationSpec;
+ String zkAddress = appMasterConfig.getZKAddress();
+ String clusterName = appMasterConfig.getAppName();
+
+ // CREATE CLUSTER and setup the resources
+ // connect
+ ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
+ connection.connect();
+
+ // create the cluster
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ StateModelDefinition statelessService =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
+ clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+ statelessService).build());
+ for (String service : applicationSpec.getServices()) {
+ String resourceName = service;
+ // add the resource with the local provisioner
+ ResourceId resourceId = ResourceId.from(resourceName);
+
+ ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+ serviceConfig.setSimpleField("service_name", service);
+ int numContainers = serviceConfig.getIntField("num_containers", 1);
+
+ YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+ provisionerConfig.setNumContainers(numContainers);
+
+ FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+ new FullAutoRebalancerConfig.Builder(resourceId);
+ RebalancerConfig rebalancerConfig =
+ rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+ .build();
+ ResourceConfig.Builder resourceConfigBuilder =
+ new ResourceConfig.Builder(ResourceId.from(resourceName));
+ ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) //
+ .rebalancerConfig(rebalancerConfig) //
+ .userConfig(serviceConfig) //
+ .build();
+ clusterAccessor.addResourceToCluster(resourceConfig);
+ }
+ // start controller
+ ControllerId controllerId = ControllerId.from("controller1");
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.start();
+
+ Thread shutdownhook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ server.shutdown();
+ }
+ });
+ Runtime.getRuntime().addShutdownHook(shutdownhook);
+ Thread.sleep(10000);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
new file mode 100644
index 0000000..0443f8a
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@ -0,0 +1,79 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.Map;
+
+import jline.ConsoleReader;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.ExternalView;
+
+public class AppStatusReportGenerator {
+ static String TAB = "\t";
+ static String NEWLINE = "\n";
+
+ String generateReport(HelixConnection connection, ClusterId clusterId) {
+ if (!connection.isConnected()) {
+ return "Unable to connect to cluster";
+ }
+ StringBuilder builder = new StringBuilder();
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ Map<ParticipantId, Participant> participants = clusterAccessor.readParticipants();
+ builder.append("AppName").append(TAB).append(clusterId).append(NEWLINE);
+ Map<ResourceId, Resource> resources = clusterAccessor.readResources();
+ for (ResourceId resourceId : resources.keySet()) {
+ builder.append("SERVICE").append(TAB).append(resourceId).append(NEWLINE);
+ Resource resource = resources.get(resourceId);
+ Map<ParticipantId, State> serviceStateMap =
+ resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
+
+ builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
+ .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB).append("CONTAINER_ID").append(NEWLINE);
+ for (Participant participant : participants.values()) {
+ // need a better check
+ if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
+ continue;
+ }
+ ContainerConfig containerConfig = participant.getContainerConfig();
+ ContainerState containerState =ContainerState.UNDEFINED;
+ ContainerId containerId = ContainerId.from("N/A");
+
+ if (containerConfig != null) {
+ containerId = containerConfig.getId();
+ containerState = containerConfig.getState();
+ }
+ State participantState = serviceStateMap.get(participant.getId());
+ if (participantState == null) {
+ participantState = State.from("UNKNOWN");
+ }
+ builder.append(TAB).append(participant.getId()).append(TAB)
+ .append(containerState).append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+ builder.append(NEWLINE);
+ }
+
+ }
+ return builder.toString();
+
+ }
+
+ public static void main(String[] args) {
+ AppStatusReportGenerator generator = new AppStatusReportGenerator();
+
+ ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
+ connection.connect();
+ String generateReport = generator.generateReport(connection, ClusterId.from("testApp"));
+ System.out.println(generateReport);
+ connection.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
deleted file mode 100644
index d63b300..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,889 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.provisioning.yarn;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- * An ApplicationMaster for executing shell commands on a set of launched
- * containers using the YARN framework.
- *
- * <p>
- * This class is meant to act as an example on how to write yarn-based
- * application masters.
- * </p>
- *
- * <p>
- * The ApplicationMaster is started on a container by the
- * <code>ResourceManager</code>'s launcher. The first thing that the
- * <code>ApplicationMaster</code> needs to do is to connect and register itself
- * with the <code>ResourceManager</code>. The registration sets up information
- * within the <code>ResourceManager</code> regarding what host:port the
- * ApplicationMaster is listening on to provide any form of functionality to a
- * client as well as a tracking url that a client can use to keep track of
- * status/job history if needed. However, in the distributedshell, trackingurl
- * and appMasterHost:appMasterRpcPort are not supported.
- * </p>
- *
- * <p>
- * The <code>ApplicationMaster</code> needs to send a heartbeat to the
- * <code>ResourceManager</code> at regular intervals to inform the
- * <code>ResourceManager</code> that it is up and alive. The
- * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
- * <code>ApplicationMaster</code> acts as a heartbeat.
- *
- * <p>
- * For the actual handling of the job, the <code>ApplicationMaster</code> has to
- * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
- * required no. of containers using {@link ResourceRequest} with the necessary
- * resource specifications such as node location, computational
- * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
- * responds with an {@link AllocateResponse} that informs the
- * <code>ApplicationMaster</code> of the set of newly allocated containers,
- * completed containers as well as current state of available resources.
- * </p>
- *
- * <p>
- * For each allocated container, the <code>ApplicationMaster</code> can then set
- * up the necessary launch context via {@link ContainerLaunchContext} to specify
- * the allocated container id, local resources required by the executable, the
- * environment to be setup for the executable, commands to execute, etc. and
- * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
- * launch and execute the defined commands on the given allocated container.
- * </p>
- *
- * <p>
- * The <code>ApplicationMaster</code> can monitor the launched container by
- * either querying the <code>ResourceManager</code> using
- * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
- * the {@link ContainerManagementProtocol} by querying for the status of the allocated
- * container's {@link ContainerId}.
- *
- * <p>
- * After the job has been completed, the <code>ApplicationMaster</code> has to
- * send a {@link FinishApplicationMasterRequest} to the
- * <code>ResourceManager</code> to inform it that the
- * <code>ApplicationMaster</code> has been completed.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class ApplicationMaster {
-
- private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-
- // Configuration
- private Configuration conf;
-
- // Handle to communicate with the Resource Manager
- @SuppressWarnings("rawtypes")
- private AMRMClientAsync amRMClient;
-
- // Handle to communicate with the Node Manager
- private NMClientAsync nmClientAsync;
- // Listen to process the response from the Node Manager
- private NMCallbackHandler containerListener;
-
- // Application Attempt Id ( combination of attemptId and fail count )
- private ApplicationAttemptId appAttemptID;
-
- // TODO
- // For status update for clients - yet to be implemented
- // Hostname of the container
- private String appMasterHostname = "";
- // Port on which the app master listens for status updates from clients
- private int appMasterRpcPort = -1;
- // Tracking url to which app master publishes info for clients to monitor
- private String appMasterTrackingUrl = "";
-
- // App Master configuration
- // No. of containers to run shell command on
- private int numTotalContainers = 1;
- // Memory to request for the container on which the shell command will run
- private int containerMemory = 10;
- // Priority of the request
- private int requestPriority;
-
- // Counter for completed containers ( complete denotes successful or failed )
- private AtomicInteger numCompletedContainers = new AtomicInteger();
- // Allocated container count so that we know how many containers has the RM
- // allocated to us
- private AtomicInteger numAllocatedContainers = new AtomicInteger();
- // Count of failed containers
- private AtomicInteger numFailedContainers = new AtomicInteger();
- // Count of containers already requested from the RM
- // Needed as once requested, we should not request for containers again.
- // Only request for more if the original requirement changes.
- private AtomicInteger numRequestedContainers = new AtomicInteger();
-
- // Shell command to be executed
- private String shellCommand = "";
- // Args to be passed to the shell command
- private String shellArgs = "";
- // Env variables to be setup for the shell command
- private Map<String, String> shellEnv = new HashMap<String, String>();
-
- // Location of shell script ( obtained from info set in env )
- // Shell script path in fs
- private String shellScriptPath = "";
- // Timestamp needed for creating a local resource
- private long shellScriptPathTimestamp = 0;
- // File length needed for local resource
- private long shellScriptPathLen = 0;
-
- // Hardcoded path to shell script in launch container's local env
- private final String ExecShellStringPath = "ExecShellScript.sh";
-
- private volatile boolean done;
- private volatile boolean success;
-
- private ByteBuffer allTokens;
-
- // Launch threads
- private List<Thread> launchThreads = new ArrayList<Thread>();
-
- /**
- * @param args Command line args
- */
- public static void main(String[] args) {
- boolean result = false;
- try {
- ApplicationMaster appMaster = new ApplicationMaster();
- LOG.info("Initializing ApplicationMaster");
- LOG.info("classpath:" + System.getProperty("java.class.path"));
- boolean doRun = appMaster.init(args);
- if (!doRun) {
- System.exit(0);
- }
- result = appMaster.run();
- } catch (Throwable t) {
- LOG.fatal("Error running ApplicationMaster", t);
- System.exit(1);
- }
- if (result) {
- LOG.info("Application Master completed successfully. exiting");
- System.exit(0);
- } else {
- LOG.info("Application Master failed. exiting");
- System.exit(2);
- }
- }
-
- /**
- * Dump out contents of $CWD and the environment to stdout for debugging
- */
- private void dumpOutDebugInfo() {
-
- LOG.info("Dump debug output");
- Map<String, String> envs = System.getenv();
- for (Map.Entry<String, String> env : envs.entrySet()) {
- LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
- System.out.println("System env: key=" + env.getKey() + ", val="
- + env.getValue());
- }
-
- String cmd = "ls -al";
- Runtime run = Runtime.getRuntime();
- Process pr = null;
- try {
- pr = run.exec(cmd);
- pr.waitFor();
-
- BufferedReader buf = new BufferedReader(new InputStreamReader(
- pr.getInputStream()));
- String line = "";
- while ((line = buf.readLine()) != null) {
- LOG.info("System CWD content: " + line);
- System.out.println("System CWD content: " + line);
- }
- buf.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public ApplicationMaster() {
- // Set up the configuration
- conf = new YarnConfiguration();
- }
-
- /**
- * Parse command line options
- *
- * @param args Command line args
- * @return Whether init successful and run should be invoked
- * @throws ParseException
- * @throws IOException
- */
- public boolean init(String[] args) throws ParseException, IOException {
-
- Options opts = new Options();
- opts.addOption("app_attempt_id", true,
- "App Attempt ID. Not to be used unless for testing purposes");
- opts.addOption("shell_command", true,
- "Shell command to be executed by the Application Master");
- opts.addOption("shell_script", true,
- "Location of the shell script to be executed");
- opts.addOption("shell_args", true, "Command line args for the shell script");
- opts.addOption("shell_env", true,
- "Environment for shell script. Specified as env_key=env_val pairs");
- opts.addOption("container_memory", true,
- "Amount of memory in MB to be requested to run the shell command");
- opts.addOption("num_containers", true,
- "No. of containers on which the shell command needs to be executed");
- opts.addOption("priority", true, "Application Priority. Default 0");
- opts.addOption("debug", false, "Dump out debug information");
-
- opts.addOption("help", false, "Print usage");
- CommandLine cliParser = new GnuParser().parse(opts, args);
-
- if (args.length == 0) {
- printUsage(opts);
- throw new IllegalArgumentException(
- "No args specified for application master to initialize");
- }
-
- if (cliParser.hasOption("help")) {
- printUsage(opts);
- return false;
- }
-
- if (cliParser.hasOption("debug")) {
- dumpOutDebugInfo();
- }
-
- Map<String, String> envs = System.getenv();
-
- if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
- if (cliParser.hasOption("app_attempt_id")) {
- String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
- appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
- } else {
- throw new IllegalArgumentException(
- "Application Attempt Id not set in the environment");
- }
- } else {
- ContainerId containerId = ConverterUtils.toContainerId(envs
- .get(Environment.CONTAINER_ID.name()));
- appAttemptID = containerId.getApplicationAttemptId();
- }
-
- if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
- throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
- + " not set in the environment");
- }
- if (!envs.containsKey(Environment.NM_HOST.name())) {
- throw new RuntimeException(Environment.NM_HOST.name()
- + " not set in the environment");
- }
- if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
- throw new RuntimeException(Environment.NM_HTTP_PORT
- + " not set in the environment");
- }
- if (!envs.containsKey(Environment.NM_PORT.name())) {
- throw new RuntimeException(Environment.NM_PORT.name()
- + " not set in the environment");
- }
-
- LOG.info("Application master for app" + ", appId="
- + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
- + appAttemptID.getApplicationId().getClusterTimestamp()
- + ", attemptId=" + appAttemptID.getAttemptId());
-
- if (!cliParser.hasOption("shell_command")) {
- throw new IllegalArgumentException(
- "No shell command specified to be executed by application master");
- }
- shellCommand = cliParser.getOptionValue("shell_command");
-
- if (cliParser.hasOption("shell_args")) {
- shellArgs = cliParser.getOptionValue("shell_args");
- }
- if (cliParser.hasOption("shell_env")) {
- String shellEnvs[] = cliParser.getOptionValues("shell_env");
- for (String env : shellEnvs) {
- env = env.trim();
- int index = env.indexOf('=');
- if (index == -1) {
- shellEnv.put(env, "");
- continue;
- }
- String key = env.substring(0, index);
- String val = "";
- if (index < (env.length() - 1)) {
- val = env.substring(index + 1);
- }
- shellEnv.put(key, val);
- }
- }
-
- if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
- shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
-
- if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
- shellScriptPathTimestamp = Long.valueOf(envs
- .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
- }
- if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
- shellScriptPathLen = Long.valueOf(envs
- .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
- }
-
- if (!shellScriptPath.isEmpty()
- && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
- LOG.error("Illegal values in env for shell script path" + ", path="
- + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
- + shellScriptPathTimestamp);
- throw new IllegalArgumentException(
- "Illegal values in env for shell script path");
- }
- }
-
- containerMemory = Integer.parseInt(cliParser.getOptionValue(
- "container_memory", "10"));
- numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
- "num_containers", "1"));
- if (numTotalContainers == 0) {
- throw new IllegalArgumentException(
- "Cannot run distributed shell with no containers");
- }
- requestPriority = Integer.parseInt(cliParser
- .getOptionValue("priority", "0"));
-
- return true;
- }
-
- /**
- * Helper function to print usage
- *
- * @param opts Parsed command line options
- */
- private void printUsage(Options opts) {
- new HelpFormatter().printHelp("ApplicationMaster", opts);
- }
-
- /**
- * Main run function for the application master
- *
- * @throws YarnException
- * @throws IOException
- */
- @SuppressWarnings({ "unchecked" })
- public boolean run() throws YarnException, IOException {
- LOG.info("Starting ApplicationMaster");
-
- Credentials credentials =
- UserGroupInformation.getCurrentUser().getCredentials();
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- // Now remove the AM->RM token so that containers cannot access it.
- Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<?> token = iter.next();
- if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
- iter.remove();
- }
- }
- allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
- AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
- amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
- amRMClient.init(conf);
- amRMClient.start();
-
- containerListener = createNMCallbackHandler();
- nmClientAsync = new NMClientAsyncImpl(containerListener);
- nmClientAsync.init(conf);
- nmClientAsync.start();
-
- // Setup local RPC Server to accept status requests directly from clients
- // TODO need to setup a protocol for client to be able to communicate to
- // the RPC server
- // TODO use the rpc port info to register with the RM for the client to
- // send requests to this app master
-
- // Register self with ResourceManager
- // This will start heartbeating to the RM
- appMasterHostname = NetUtils.getHostname();
- RegisterApplicationMasterResponse response = amRMClient
- .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- appMasterTrackingUrl);
- // Dump out information about cluster capability as seen by the
- // resource manager
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- // A resource ask cannot exceed the max.
- if (containerMemory > maxMem) {
- LOG.info("Container memory specified above max threshold of cluster."
- + " Using max value." + ", specified=" + containerMemory + ", max="
- + maxMem);
- containerMemory = maxMem;
- }
-
- // Setup ask for containers from RM
- // Send request for containers to RM
- // Until we get our fully allocated quota, we keep on polling RM for
- // containers
- // Keep looping until all the containers are launched and shell script
- // executed on them ( regardless of success/failure).
- for (int i = 0; i < numTotalContainers; ++i) {
- ContainerRequest containerAsk = setupContainerAskForRM();
- amRMClient.addContainerRequest(containerAsk);
- }
- numRequestedContainers.set(numTotalContainers);
-
- while (!done
- && (numCompletedContainers.get() != numTotalContainers)) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ex) {}
- }
- finish();
-
- return success;
- }
-
- @VisibleForTesting
- NMCallbackHandler createNMCallbackHandler() {
- return new NMCallbackHandler(this);
- }
-
- private void finish() {
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- e.printStackTrace();
- }
- }
-
- // When the application completes, it should stop all running containers
- LOG.info("Application completed. Stopping running containers");
- nmClientAsync.stop();
-
- // When the application completes, it should send a finish application
- // signal to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinalApplicationStatus appStatus;
- String appMessage = null;
- success = true;
- if (numFailedContainers.get() == 0 &&
- numCompletedContainers.get() == numTotalContainers) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else {
- appStatus = FinalApplicationStatus.FAILED;
- appMessage = "Diagnostics." + ", total=" + numTotalContainers
- + ", completed=" + numCompletedContainers.get() + ", allocated="
- + numAllocatedContainers.get() + ", failed="
- + numFailedContainers.get();
- success = false;
- }
- try {
- amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
- } catch (YarnException ex) {
- LOG.error("Failed to unregister application", ex);
- } catch (IOException e) {
- LOG.error("Failed to unregister application", e);
- }
-
- amRMClient.stop();
- }
-
- private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
- @SuppressWarnings("unchecked")
- @Override
- public void onContainersCompleted(List<ContainerStatus> completedContainers) {
- LOG.info("Got response from RM for container ask, completedCnt="
- + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
- + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- assert (containerStatus.getState() == ContainerState.COMPLETE);
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- if (ContainerExitStatus.ABORTED != exitStatus) {
- // shell script failed
- // counts as completed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- } else {
- // container was killed by framework, possibly preempted
- // we should re-try as the container was lost for some reason
- numAllocatedContainers.decrementAndGet();
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as it would be done
- // by the RM
- }
- } else {
- // nothing to do
- // container completed successfully
- numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId="
- + containerStatus.getContainerId());
- }
- }
-
- // ask for more containers if any failed
- int askCount = numTotalContainers - numRequestedContainers.get();
- numRequestedContainers.addAndGet(askCount);
-
- if (askCount > 0) {
- for (int i = 0; i < askCount; ++i) {
- ContainerRequest containerAsk = setupContainerAskForRM();
- amRMClient.addContainerRequest(containerAsk);
- }
- }
-
- if (numCompletedContainers.get() == numTotalContainers) {
- done = true;
- }
- }
-
- @Override
- public void onContainersAllocated(List<Container> allocatedContainers) {
- LOG.info("Got response from RM for container ask, allocatedCnt="
- + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
- LaunchContainerRunnable runnableLaunchContainer =
- new LaunchContainerRunnable(allocatedContainer, containerListener);
- Thread launchThread = new Thread(runnableLaunchContainer);
-
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
- }
-
- @Override
- public void onShutdownRequest() {
- done = true;
- }
-
- @Override
- public void onNodesUpdated(List<NodeReport> updatedNodes) {}
-
- @Override
- public float getProgress() {
- // set progress to deliver to RM on next heartbeat
- float progress = (float) numCompletedContainers.get()
- / numTotalContainers;
- return progress;
- }
-
- @Override
- public void onError(Throwable e) {
- done = true;
- amRMClient.stop();
- }
- }
-
- @VisibleForTesting
- static class NMCallbackHandler
- implements NMClientAsync.CallbackHandler {
-
- private ConcurrentMap<ContainerId, Container> containers =
- new ConcurrentHashMap<ContainerId, Container>();
- private final ApplicationMaster applicationMaster;
-
- public NMCallbackHandler(ApplicationMaster applicationMaster) {
- this.applicationMaster = applicationMaster;
- }
-
- public void addContainer(ContainerId containerId, Container container) {
- containers.putIfAbsent(containerId, container);
- }
-
- @Override
- public void onContainerStopped(ContainerId containerId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Succeeded to stop Container " + containerId);
- }
- containers.remove(containerId);
- }
-
- @Override
- public void onContainerStatusReceived(ContainerId containerId,
- ContainerStatus containerStatus) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container Status: id=" + containerId + ", status=" +
- containerStatus);
- }
- }
-
- @Override
- public void onContainerStarted(ContainerId containerId,
- Map<String, ByteBuffer> allServiceResponse) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Succeeded to start Container " + containerId);
- }
- Container container = containers.get(containerId);
- if (container != null) {
- applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
- }
- }
-
- @Override
- public void onStartContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to start Container " + containerId);
- containers.remove(containerId);
- applicationMaster.numCompletedContainers.incrementAndGet();
- applicationMaster.numFailedContainers.incrementAndGet();
- }
-
- @Override
- public void onGetContainerStatusError(
- ContainerId containerId, Throwable t) {
- LOG.error("Failed to query the status of Container " + containerId);
- }
-
- @Override
- public void onStopContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to stop Container " + containerId);
- containers.remove(containerId);
- }
- }
-
- /**
- * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
- * that will execute the shell command.
- */
- private class LaunchContainerRunnable implements Runnable {
-
- // Allocated container
- Container container;
-
- NMCallbackHandler containerListener;
-
- /**
- * @param lcontainer Allocated container
- * @param containerListener Callback handler of the container
- */
- public LaunchContainerRunnable(
- Container lcontainer, NMCallbackHandler containerListener) {
- this.container = lcontainer;
- this.containerListener = containerListener;
- }
-
- @Override
- /**
- * Connects to CM, sets up container launch context
- * for shell command and eventually dispatches the container
- * start request to the CM.
- */
- public void run() {
- LOG.info("Setting up container launch container for containerid="
- + container.getId());
- ContainerLaunchContext ctx = Records
- .newRecord(ContainerLaunchContext.class);
-
- // Set the environment
- ctx.setEnvironment(shellEnv);
-
- // Set the local resources
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-
- // The container for the eventual shell commands needs its own local
- // resources too.
- // In this scenario, if a shell script is specified, we need to have it
- // copied and made available to the container.
- if (!shellScriptPath.isEmpty()) {
- LocalResource shellRsrc = Records.newRecord(LocalResource.class);
- shellRsrc.setType(LocalResourceType.FILE);
- shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- try {
- shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
- shellScriptPath)));
- } catch (URISyntaxException e) {
- LOG.error("Error when trying to use shell script path specified"
- + " in env, path=" + shellScriptPath);
- e.printStackTrace();
-
- // A failure scenario on bad input such as invalid shell script path
- // We know we cannot continue launching the container
- // so we should release it.
- // TODO
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- return;
- }
- shellRsrc.setTimestamp(shellScriptPathTimestamp);
- shellRsrc.setSize(shellScriptPathLen);
- localResources.put(ExecShellStringPath, shellRsrc);
- }
- ctx.setLocalResources(localResources);
-
- // Set the necessary command to execute on the allocated container
- Vector<CharSequence> vargs = new Vector<CharSequence>(5);
-
- // Set executable command
- vargs.add(shellCommand);
- // Set shell script path
- if (!shellScriptPath.isEmpty()) {
- vargs.add(ExecShellStringPath);
- }
-
- // Set args for the shell command if any
- vargs.add(shellArgs);
- // Add log redirect params
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
- ctx.setCommands(commands);
-
- // Set up tokens for the container too. Today, for normal shell commands,
- // the container in distribute-shell doesn't need any tokens. We are
- // populating them mainly for NodeManagers to be able to download any
- // files in the distributed file-system. The tokens are otherwise also
- // useful in cases, for e.g., when one is running a "hadoop dfs" command
- // inside the distributed shell.
- ctx.setTokens(allTokens.duplicate());
-
- containerListener.addContainer(container.getId(), container);
- nmClientAsync.startContainerAsync(container, ctx);
- }
- }
-
- /**
- * Setup the request that will be sent to the RM for the container ask.
- *
- * @return the setup ResourceRequest to be sent to RM
- */
- private ContainerRequest setupContainerAskForRM() {
- // setup requirements for hosts
- // using * as any host will do for the distributed shell app
- // set the priority for the request
- Priority pri = Records.newRecord(Priority.class);
- // TODO - what is the range for priority? how to decide?
- pri.setPriority(requestPriority);
-
- // Set up resource type requirements
- // For now, only memory is supported so we set memory requirements
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(containerMemory);
-
- ContainerRequest request = new ContainerRequest(capability, null, null,
- pri);
- LOG.info("Requested container ask: " + request.toString());
- return request;
- }
-
- public static ApplicationMaster getInstance() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
deleted file mode 100644
index 285d036..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.net.URI;
-import java.util.List;
-
-
-public interface ApplicationSpec {
- /**
- * Returns the name of the application
- * @return
- */
- String getAppName();
-
- AppConfig getConfig();
-
- List<String> getServices();
-
- URI getAppMasterPackage();
-
- URI getServicePackage(String serviceName);
-
- String getServiceMainClass(String service);
-
- ServiceConfig getServiceConfig(String serviceName);
-
- List<TaskConfig> getTaskConfigs();
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
deleted file mode 100644
index 352dc0c..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.io.InputStream;
-
-public interface ApplicationSpecFactory {
-
- ApplicationSpec fromYaml(InputStream yamlFile);
-
-}