You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/05/01 03:28:18 UTC
git commit: Complete job runner recipe
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 97ca4de4a -> 785bb9fbb
Complete job runner recipe
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/785bb9fb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/785bb9fb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/785bb9fb
Branch: refs/heads/helix-provisioning
Commit: 785bb9fbbab2d82532a26ed253e6a72dffaa9849
Parents: 97ca4de
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Apr 30 18:28:08 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Apr 30 18:28:08 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobContext.java | 23 ++-
.../java/org/apache/helix/task/TaskDriver.java | 60 ++++---
.../org/apache/helix/task/TaskRebalancer.java | 5 +
.../java/org/apache/helix/task/Workflow.java | 2 +-
.../helix/provisioning/yarn/AppLauncher.java | 75 +++++++--
.../provisioning/yarn/AppMasterLauncher.java | 50 +++---
recipes/jobrunner-yarn/pom.xml | 159 +++++++++++++++++++
recipes/jobrunner-yarn/run.sh | 6 +
.../jobrunner-yarn/src/assemble/assembly.xml | 60 +++++++
.../src/main/config/log4j.properties | 31 ++++
.../yarn/example/JobRunnerMain.java | 127 +++++++++++++++
.../helix/provisioning/yarn/example/MyTask.java | 53 +++++++
.../yarn/example/MyTaskAppSpec.java | 148 +++++++++++++++++
.../yarn/example/MyTaskAppSpecFactory.java | 28 ++++
.../yarn/example/MyTaskService.java | 62 ++++++++
.../src/main/resources/dummy_job.yaml | 18 +++
.../src/main/resources/job_runner_app_spec.yaml | 27 ++++
recipes/jobrunner-yarn/src/test/conf/testng.xml | 27 ++++
recipes/pom.xml | 1 +
19 files changed, 909 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 7742c67..c10173d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -43,7 +43,8 @@ public class JobContext extends HelixProperty {
NUM_ATTEMPTS,
FINISH_TIME,
TARGET,
- TASK_ID
+ TASK_ID,
+ ASSIGNED_PARTICIPANT
}
public JobContext(ZNRecord record) {
@@ -224,4 +225,24 @@ public class JobContext extends HelixProperty {
}
return partitionMap;
}
+
+ public void setAssignedParticipant(int p, String participantName) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+ }
+
+ public String getAssignedParticipant(int p) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ return null;
+ } else {
+ return map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index ada2f99..193b78e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -46,7 +45,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.log4j.Logger;
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
/**
* CLI for scheduling/canceling workflows
@@ -233,36 +232,59 @@ public class TaskDriver {
WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
- LOG.info("Workflow " + resource + " consists of the following tasks: "
+ System.out.println("Workflow " + resource + " consists of the following tasks: "
+ wCfg.getJobDag().getAllNodes());
- LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
- LOG.info("Job states are: ");
- LOG.info("-------");
+ System.out.println("Current state of workflow is " + wCtx.getWorkflowState().name());
+ System.out.println("Job states are: ");
+ System.out.println("-------");
for (String job : wCfg.getJobDag().getAllNodes()) {
- LOG.info("Task " + job + " is " + wCtx.getJobState(job));
+ System.out.println("Job " + job + " is " + wCtx.getJobState(job));
// fetch task information
+ JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
JobContext jCtx = TaskUtil.getJobContext(_manager, job);
// calculate taskPartitions
List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
Collections.sort(partitions);
- // group partitions by status
- Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
- for (Integer i : partitions) {
- TaskPartitionState s = jCtx.getPartitionState(i);
- if (!statusCount.containsKey(s)) {
- statusCount.put(s, 0);
+ // report status
+ for (Integer partition : partitions) {
+ String taskId = jCtx.getTaskIdForPartition(partition);
+ taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+ System.out.println("Task: " + taskId);
+ TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+ if (taskConfig != null) {
+ System.out.println("Configuration: " + taskConfig.getConfigMap());
}
- statusCount.put(s, statusCount.get(s) + 1);
- }
-
- for (TaskPartitionState s : statusCount.keySet()) {
- LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+ TaskPartitionState state = jCtx.getPartitionState(partition);
+ if (state == null) {
+ state = TaskPartitionState.INIT;
+ }
+ System.out.println("State: " + state);
+ String assignedParticipant = jCtx.getAssignedParticipant(partition);
+ if (assignedParticipant != null) {
+ System.out.println("Assigned participant: " + assignedParticipant);
+ }
+ System.out.println("-------");
}
- LOG.info("-------");
+ // group partitions by status
+ /*
+ * Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
+ * for (Integer i : partitions) {
+ * TaskPartitionState s = jCtx.getPartitionState(i);
+ * if (!statusCount.containsKey(s)) {
+ * statusCount.put(s, 0);
+ * }
+ * statusCount.put(s, statusCount.get(s) + 1);
+ * }
+ * for (TaskPartitionState s : statusCount.keySet()) {
+ * LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+ * }
+ */
+
+ System.out.println("-------");
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 829f0c4..e9f60f9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -227,6 +227,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
// TASK_ERROR, ERROR.
Set<Integer> donePartitions = new TreeSet<Integer>();
for (int pId : pSet) {
+ jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
final String pName = pName(jobResource, pId);
// Check for pending state transitions on this (partition, instance).
@@ -289,6 +290,8 @@ public abstract class TaskRebalancer implements HelixRebalancer {
nextState = TaskPartitionState.STOPPED;
}
+ jobCtx.setPartitionState(pId, currState);
+
paMap.put(pId, new PartitionAssignment(instance.toString(), nextState.name()));
assignedPartitions.add(pId);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
@@ -378,6 +381,8 @@ public abstract class TaskRebalancer implements HelixRebalancer {
paMap.put(pId,
new PartitionAssignment(instance.toString(), TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
+ jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
+ jobCtx.setAssignedParticipant(pId, instance.toString());
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
TaskPartitionState.RUNNING, instance));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 5b27fb6..383180e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -152,7 +152,7 @@ public class Workflow {
builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
builder.addConfig(job.name, JobConfig.COMMAND, job.command);
if (job.jobConfigMap != null) {
- builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
+ builder.addJobConfigMap(job.name, job.jobConfigMap);
}
builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
if (job.targetPartitionStates != null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 4b77105..9a19842 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -2,7 +2,6 @@ package org.apache.helix.provisioning.yarn;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -93,6 +92,10 @@ public class AppLauncher {
yarnClient.init(_conf);
}
+ public ApplicationSpec getApplicationSpec() {
+ return _applicationSpec;
+ }
+
public boolean launch() throws Exception {
LOG.info("Running Client");
yarnClient.start();
@@ -189,7 +192,7 @@ public class AppLauncher {
classPathEnv.append(':');
classPathEnv.append(System.getProperty("java.class.path"));
}
- LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n" );
+ LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n");
// Set the env variables to be setup in the env where the application master will be run
Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
env.put("CLASSPATH", classPathEnv.toString());
@@ -268,12 +271,11 @@ public class AppLauncher {
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
-
LOG.info("Submitting application to YARN Resource Manager");
ApplicationId applicationId = yarnClient.submitApplication(appContext);
- LOG.info("Submitted application with applicationId:" + applicationId );
+ LOG.info("Submitted application with applicationId:" + applicationId);
return true;
}
@@ -352,6 +354,52 @@ public class AppLauncher {
|| path.endsWith("zip");
}
+ public HelixConnection pollForConnection() {
+ String prevReport = "";
+ HelixConnection connection = null;
+
+ while (true) {
+ try {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+ String reportMessage = generateReport(report);
+ if (!reportMessage.equals(prevReport)) {
+ LOG.info(reportMessage);
+ }
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (YarnApplicationState.RUNNING == state) {
+ if (connection == null) {
+ String hostName = null;
+ int ind = report.getHost().indexOf('/');
+ if (ind > -1) {
+ hostName = report.getHost().substring(ind + 1);
+ } else {
+ hostName = report.getHost();
+ }
+ connection = new ZkHelixConnection(hostName + ":2181");
+
+ try {
+ connection.connect();
+ } catch (Exception e) {
+ LOG.warn("AppMaster started but not yet initialized");
+ connection = null;
+ }
+ }
+ if (connection.isConnected()) {
+ return connection;
+ }
+ }
+ prevReport = reportMessage;
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ LOG.error("Exception while getting info ");
+ break;
+ }
+ }
+ return null;
+ }
+
/**
* @return true if successfully completed, it will print status every X seconds
*/
@@ -434,7 +482,7 @@ public class AppLauncher {
+ ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
}
- protected void cleanup() {
+ public void cleanup() {
LOG.info("Cleaning up");
try {
ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
@@ -446,23 +494,28 @@ public class AppLauncher {
}
/**
- * Launches the application on a YARN cluster. Once launched, it will display (periodically) the status of the containers in the application.
+ * Launches the application on a YARN cluster. Once launched, it will display (periodically) the
+ * status of the containers in the application.
* @param args app_spec_provider and app_config_spec
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Options opts = new Options();
- opts.addOption(new Option("app_spec_provider",true, "Application Spec Factory Class that will parse the app_config_spec file"));
- opts.addOption(new Option("app_config_spec",true, "YAML config file that provides the app specifications"));
+ opts.addOption(new Option("app_spec_provider", true,
+ "Application Spec Factory Class that will parse the app_config_spec file"));
+ opts.addOption(new Option("app_config_spec", true,
+ "YAML config file that provides the app specifications"));
CommandLine cliParser = new GnuParser().parse(opts, args);
String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
- ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(appSpecFactoryClass);
+ ApplicationSpecFactory applicationSpecFactory =
+ HelixYarnUtil.createInstance(appSpecFactoryClass);
File yamlConfigFile = new File(yamlConfigFileName);
- if(!yamlConfigFile.exists()){
- throw new IllegalArgumentException("YAML app_config_spec file: '"+ yamlConfigFileName + "' does not exist");
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+ + "' does not exist");
}
final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
launcher.launch();
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 72d6ea9..523fee0 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -3,21 +3,16 @@ package org.apache.helix.provisioning.yarn;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixController;
import org.apache.helix.api.accessor.ClusterAccessor;
@@ -26,15 +21,18 @@ import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
import org.apache.helix.manager.zk.ZkHelixConnection;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.provisioning.ApplicationSpec;
import org.apache.helix.provisioning.ApplicationSpecFactory;
import org.apache.helix.provisioning.HelixYarnUtil;
import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.log4j.Logger;
@@ -50,8 +48,7 @@ import org.apache.log4j.Logger;
public class AppMasterLauncher {
public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
- @SuppressWarnings("unchecked")
- public static void main(String[] args) throws Exception{
+ public static void main(String[] args) throws Exception {
Map<String, String> env = System.getenv();
LOG.info("Starting app master with the following environment variables");
for (String key : env.keySet()) {
@@ -61,11 +58,6 @@ public class AppMasterLauncher {
Options opts;
opts = new Options();
opts.addOption("num_containers", true, "Number of containers");
- try {
- CommandLine cliParser = new GnuParser().parse(opts, args);
- } catch (Exception e) {
- LOG.error("Error parsing input arguments" + Arrays.toString(args), e);
- }
// START ZOOKEEPER
String dataDir = "dataDir";
@@ -94,7 +86,7 @@ public class AppMasterLauncher {
String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
String className = appMasterConfig.getApplicationSpecFactory();
-
+
GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
try {
genericApplicationMaster.start();
@@ -102,8 +94,8 @@ public class AppMasterLauncher {
LOG.error("Unable to start application master: ", e);
}
ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
-
- //TODO: Avoid setting static variable.
+
+ // TODO: Avoid setting static variable.
YarnProvisioner.applicationMaster = genericApplicationMaster;
YarnProvisioner.applicationMasterConfig = appMasterConfig;
ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
@@ -121,17 +113,19 @@ public class AppMasterLauncher {
ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
StateModelDefinition statelessService =
new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
- clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
- statelessService).build());
+ StateModelDefinition taskStateModel =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel());
+ clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId)
+ .addStateModelDefinition(statelessService).addStateModelDefinition(taskStateModel).build());
for (String service : applicationSpec.getServices()) {
String resourceName = service;
// add the resource with the local provisioner
ResourceId resourceId = ResourceId.from(resourceName);
-
+
ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
serviceConfig.setSimpleField("service_name", service);
int numContainers = serviceConfig.getIntField("num_containers", 1);
-
+
YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
provisionerConfig.setNumContainers(numContainers);
@@ -153,6 +147,20 @@ public class AppMasterLauncher {
HelixController controller = connection.createController(clusterId, controllerId);
controller.start();
+ // Start any pre-specified jobs
+ List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
+ if (taskConfigs != null) {
+ for (TaskConfig taskConfig : taskConfigs) {
+ String yamlFile = taskConfig.getValue("yamlFile");
+ if (yamlFile != null) {
+ File file = new File(yamlFile);
+ Workflow workflow = Workflow.parse(file);
+ TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+ taskDriver.start(workflow);
+ }
+ }
+ }
+
Thread shutdownhook = new Thread(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/pom.xml b/recipes/jobrunner-yarn/pom.xml
new file mode 100644
index 0000000..f067a56
--- /dev/null
+++ b/recipes/jobrunner-yarn/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.helix.recipes</groupId>
+ <artifactId>recipes</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>jobrunner-yarn</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Helix :: Recipes :: Provisioning :: YARN :: Job Runner</name>
+
+ <properties>
+ <osgi.import>
+ org.apache.helix*,
+ org.apache.log4j,
+ *
+ </osgi.import>
+ <osgi.export>org.apache.helix.provisioning.yarn.example*;version="${project.version};-noimport:=true</osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-provisioning</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <!-- Set the target configuration directory to be used in the bin scripts -->
+ <!-- <configurationDirectory>conf</configurationDirectory> -->
+ <!-- Copy the contents from "/src/main/config" to the target configuration
+ directory in the assembled application -->
+ <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+ <!-- Include the target configuration directory in the beginning of
+ the classpath declaration in the bin scripts -->
+ <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+ <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+ <!-- Extra JVM arguments that will be included in the bin scripts -->
+ <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+ <!-- Generate bin scripts for windows and unix pr default -->
+ <platforms>
+ <platform>windows</platform>
+ <platform>unix</platform>
+ </platforms>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.helix.provisioning.yarn.AppLauncher</mainClass>
+ <name>app-launcher</name>
+ </program>
+ <program>
+ <mainClass>org.apache.helix.provisioning.yarn.example.JobRunnerMain</mainClass>
+ <name>job-runner</name>
+ </program>
+ </programs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assemble/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/run.sh
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/run.sh b/recipes/jobrunner-yarn/run.sh
new file mode 100755
index 0000000..07448bb
--- /dev/null
+++ b/recipes/jobrunner-yarn/run.sh
@@ -0,0 +1,6 @@
+#cd ../../
+#mvn clean install -DskipTests
+#cd recipes/helloworld-provisioning-yarn
+mvn clean package -DskipTests
+chmod +x target/helloworld-provisioning-yarn-pkg/bin/app-launcher.sh
+target/helloworld-provisioning-yarn/pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/assemble/assembly.xml b/recipes/jobrunner-yarn/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+ <id>pkg</id>
+ <formats>
+ <format>tar</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+ <outputDirectory>repo</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ <excludes>
+ <exclude>**/*.xml</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>LICENSE</include>
+ <include>NOTICE</include>
+ <include>DISCLAIMER</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/config/log4j.properties b/recipes/jobrunner-yarn/src/main/config/log4j.properties
new file mode 100644
index 0000000..91fac03
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
new file mode 100644
index 0000000..623854f
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
@@ -0,0 +1,127 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRole;
+import org.apache.helix.InstanceType;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.TaskConfig;
+import org.apache.helix.provisioning.yarn.AppLauncher;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+
+public class JobRunnerMain {
+ public static void main(String[] args) throws Exception {
+ Options opts = new Options();
+ opts.addOption(new Option("app_spec_provider", true,
+ "Application Spec Factory Class that will parse the app_config_spec file"));
+ opts.addOption(new Option("app_config_spec", true,
+ "YAML config file that provides the app specifications"));
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+ String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+
+ ApplicationSpecFactory applicationSpecFactory =
+ HelixYarnUtil.createInstance(appSpecFactoryClass);
+ File yamlConfigFile = new File(yamlConfigFileName);
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+ + "' does not exist");
+ }
+ final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+ launcher.launch();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ launcher.cleanup();
+ }
+ }));
+
+ final ApplicationSpec appSpec = launcher.getApplicationSpec();
+
+ // Repeatedly print status
+ final HelixConnection connection = launcher.pollForConnection();
+ final ClusterId clusterId = ClusterId.from(appSpec.getAppName());
+ // TODO: this is a hack -- TaskDriver should accept a connection instead of a manager
+ HelixManager manager = new HelixConnectionAdaptor(new HelixRole() {
+ @Override
+ public HelixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ClusterId getClusterId() {
+ return clusterId;
+ }
+
+ @Override
+ public Id getId() {
+ return null;
+ }
+
+ @Override
+ public InstanceType getType() {
+ return InstanceType.ADMINISTRATOR;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return null;
+ }
+ });
+
+ // Get all submitted jobs
+ String workflow = null;
+ List<TaskConfig> taskConfigs = appSpec.getTaskConfigs();
+ if (taskConfigs != null) {
+ for (TaskConfig taskConfig : taskConfigs) {
+ String yamlFile = taskConfig.getValue("yamlFile");
+ if (yamlFile != null) {
+ Workflow flow = Workflow.parse(new File(yamlFile));
+ workflow = flow.getName();
+ }
+ }
+ }
+
+ // Repeatedly poll for status
+ if (workflow != null) {
+ ClusterAccessor accessor = connection.createClusterAccessor(clusterId);
+ TaskDriver driver = new TaskDriver(manager);
+ while (true) {
+ System.out.println("CONTAINER STATUS");
+ System.out.println("----------------");
+ Collection<Participant> participants = accessor.readParticipants().values();
+ for (Participant participant : participants) {
+ ContainerConfig containerConfig = participant.getContainerConfig();
+ if (containerConfig != null) {
+ System.out.println(participant.getId() + "[" + containerConfig.getId() + "]: "
+ + containerConfig.getState());
+ }
+ }
+ System.out.println("----------------");
+ System.out.println("TASK STATUS");
+ System.out.println("----------------");
+ driver.list(workflow);
+ Thread.sleep(5000);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
new file mode 100644
index 0000000..584550d
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTask.java
@@ -0,0 +1,53 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+import org.apache.log4j.Logger;
+
+/**
+ * Callbacks for task execution - THIS INTERFACE IS SUBJECT TO CHANGE
+ */
+public class MyTask implements Task {
+ private static final Logger LOG = Logger.getLogger(MyTask.class);
+ private static final long DEFAULT_DELAY = 60000L;
+ private final long _delay;
+ private volatile boolean _canceled;
+
+ public MyTask(TaskCallbackContext context) {
+ LOG.info("Job config" + context.getJobConfig().getJobConfigMap());
+ if (context.getTaskConfig() != null) {
+ LOG.info("Task config: " + context.getTaskConfig().getConfigMap());
+ }
+ _delay = DEFAULT_DELAY;
+ }
+
+ @Override
+ public TaskResult run() {
+ long expiry = System.currentTimeMillis() + _delay;
+ long timeLeft;
+ while (System.currentTimeMillis() < expiry) {
+ if (_canceled) {
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+ : timeLeft));
+ }
+ sleep(50);
+ }
+ timeLeft = expiry - System.currentTimeMillis();
+ return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ }
+
+ @Override
+ public void cancel() {
+ _canceled = true;
+ }
+
+ private static void sleep(long d) {
+ try {
+ Thread.sleep(d);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
new file mode 100644
index 0000000..a20994c
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpec.java
@@ -0,0 +1,148 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.provisioning.AppConfig;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.TaskConfig;
+
+import com.google.common.collect.Maps;
+
+public class MyTaskAppSpec implements ApplicationSpec {
+
+ public String _appName;
+
+ public AppConfig _appConfig;
+
+ public List<String> _services;
+
+ private String _appMasterPackageUri;
+
+ private Map<String, String> _servicePackageURIMap;
+
+ private Map<String, String> _serviceMainClassMap;
+
+ private Map<String, ServiceConfig> _serviceConfigMap;
+
+ private List<TaskConfig> _taskConfigs;
+
+ public AppConfig getAppConfig() {
+ return _appConfig;
+ }
+
+ public void setAppConfig(AppConfig appConfig) {
+ _appConfig = appConfig;
+ }
+
+ public String getAppMasterPackageUri() {
+ return _appMasterPackageUri;
+ }
+
+ public void setAppMasterPackageUri(String appMasterPackageUri) {
+ _appMasterPackageUri = appMasterPackageUri;
+ }
+
+ public Map<String, String> getServicePackageURIMap() {
+ return _servicePackageURIMap;
+ }
+
+ public void setServicePackageURIMap(Map<String, String> servicePackageURIMap) {
+ _servicePackageURIMap = servicePackageURIMap;
+ }
+
+ public Map<String, String> getServiceMainClassMap() {
+ return _serviceMainClassMap;
+ }
+
+ public void setServiceMainClassMap(Map<String, String> serviceMainClassMap) {
+ _serviceMainClassMap = serviceMainClassMap;
+ }
+
+ public Map<String, Map<String, String>> getServiceConfigMap() {
+ Map<String, Map<String, String>> map = Maps.newHashMap();
+ for (String service : _serviceConfigMap.keySet()) {
+ map.put(service, _serviceConfigMap.get(service).getSimpleFields());
+ }
+ return map;
+ }
+
+ public void setServiceConfigMap(Map<String, Map<String, Object>> map) {
+ _serviceConfigMap = Maps.newHashMap();
+
+ for (String service : map.keySet()) {
+ ServiceConfig serviceConfig = new ServiceConfig(Scope.resource(ResourceId.from(service)));
+ Map<String, Object> simpleFields = map.get(service);
+ for (String key : simpleFields.keySet()) {
+ serviceConfig.setSimpleField(key, simpleFields.get(key).toString());
+ }
+ _serviceConfigMap.put(service, serviceConfig);
+ }
+ }
+
+ public void setAppName(String appName) {
+ _appName = appName;
+ }
+
+ public void setServices(List<String> services) {
+ _services = services;
+ }
+
+ public void setTaskConfigs(List<TaskConfig> taskConfigs) {
+ _taskConfigs = taskConfigs;
+ }
+
+ @Override
+ public String getAppName() {
+ return _appName;
+ }
+
+ @Override
+ public AppConfig getConfig() {
+ return _appConfig;
+ }
+
+ @Override
+ public List<String> getServices() {
+ return _services;
+ }
+
+ @Override
+ public URI getAppMasterPackage() {
+ try {
+ return new URI(_appMasterPackageUri);
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public URI getServicePackage(String serviceName) {
+ try {
+ return new URI(_servicePackageURIMap.get(serviceName));
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public String getServiceMainClass(String service) {
+ return _serviceMainClassMap.get(service);
+ }
+
+ @Override
+ public ServiceConfig getServiceConfig(String serviceName) {
+ return _serviceConfigMap.get(serviceName);
+ }
+
+ @Override
+ public List<TaskConfig> getTaskConfigs() {
+ return _taskConfigs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
new file mode 100644
index 0000000..17601ba
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskAppSpecFactory.java
@@ -0,0 +1,28 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.io.InputStream;
+
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.yaml.snakeyaml.Yaml;
+
+public class MyTaskAppSpecFactory implements ApplicationSpecFactory {
+
+ @Override
+ public ApplicationSpec fromYaml(InputStream inputstream) {
+ return (ApplicationSpec) new Yaml().load(inputstream);
+ // return data;
+ }
+
+ public static void main(String[] args) {
+
+ Yaml yaml = new Yaml();
+ InputStream resourceAsStream =
+ ClassLoader.getSystemClassLoader().getResourceAsStream("job_runner_app_spec.yaml");
+ MyTaskAppSpec spec = yaml.loadAs(resourceAsStream, MyTaskAppSpec.class);
+ String dump = yaml.dump(spec);
+ System.out.println(dump);
+ System.out.println(spec.getServiceConfig("JobRunner").getStringField("num_containers", "1"));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
new file mode 100644
index 0000000..22c3ab0
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
@@ -0,0 +1,62 @@
+package org.apache.helix.provisioning.yarn.example;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.participant.AbstractParticipantService;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.provisioning.participant.StatelessParticipantService;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.log4j.Logger;
+
+/**
+ * A simple "service" for task callback registration.
+ */
+public class MyTaskService extends StatelessParticipantService {
+
+ private static Logger LOG = Logger.getLogger(AbstractParticipantService.class);
+
+ static String SERVICE_NAME = "JobRunner";
+
+ public MyTaskService(HelixConnection connection, ClusterId clusterId,
+ ParticipantId participantId) {
+ super(connection, clusterId, participantId, SERVICE_NAME);
+ }
+
+ @Override
+ protected void init(ServiceConfig serviceConfig) {
+ LOG.info("Initialized service with config " + serviceConfig);
+
+ // Register for callbacks for tasks
+ HelixManager manager = new HelixConnectionAdaptor(getParticipant());
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put("RunTask", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new MyTask(context);
+ }
+ });
+ getParticipant().getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("Task"), new TaskStateModelFactory(manager, taskFactoryReg));
+ }
+
+ @Override
+ protected void goOnline() {
+ LOG.info("JobRunner service is told to go online");
+ }
+
+ @Override
+ protected void goOffine() {
+ LOG.info("JobRunner service is told to go offline");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml b/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
new file mode 100644
index 0000000..0187fd1
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml
@@ -0,0 +1,18 @@
+name: myJob1234
+jobs:
+ - name: myJob1234
+ command: RunTask
+ jobConfigMap: {
+ k1: "v1",
+ k2: "v2"
+ }
+ tasks:
+ - taskConfigMap: {
+ k3: "v3"
+ }
+ - taskConfigMap: {
+ k4: "v4"
+ }
+ - taskConfigMap: {
+ k5: "v5"
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
new file mode 100755
index 0000000..ad62ffc
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
@@ -0,0 +1,27 @@
+!!org.apache.helix.provisioning.yarn.example.MyTaskAppSpec
+appConfig:
+ config: {
+ k1: v1
+ }
+appMasterPackageUri: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/target/jobrunner-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
+appName: testApp
+serviceConfigMap:
+ JobRunner: {
+ num_containers: 3,
+ memory: 1024
+ }
+serviceMainClassMap: {
+ JobRunner: org.apache.helix.provisioning.yarn.example.MyTaskService
+}
+servicePackageURIMap: {
+ JobRunner: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/target/jobrunner-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
+}
+services: [
+ JobRunner]
+taskConfigs:
+ - config: {
+ yamlFile: '/Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'
+ }
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/jobrunner-yarn/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/test/conf/testng.xml b/recipes/jobrunner-yarn/src/test/conf/testng.xml
new file mode 100644
index 0000000..37bccf3
--- /dev/null
+++ b/recipes/jobrunner-yarn/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+ <test name="Test" preserve-order="false">
+ <packages>
+ <package name="org.apache.helix.agent"/>
+ </packages>
+ </test>
+</suite>
http://git-wip-us.apache.org/repos/asf/helix/blob/785bb9fb/recipes/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/pom.xml b/recipes/pom.xml
index 5d137c2..3fcaf42 100644
--- a/recipes/pom.xml
+++ b/recipes/pom.xml
@@ -37,6 +37,7 @@ under the License.
<module>task-execution</module>
<module>service-discovery</module>
<module>helloworld-provisioning-yarn</module>
+ <module>jobrunner-yarn</module>
</modules>
<build>