You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/04 01:49:19 UTC
git commit: Task framework recipe runs on distributed YARN
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 99f5ff7bb -> feaea562f
Task framework recipe runs on distributed YARN
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/feaea562
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/feaea562
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/feaea562
Branch: refs/heads/helix-provisioning
Commit: feaea562f2b52ebad5cfd6aba92864cd411a582f
Parents: 99f5ff7
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Jul 3 16:47:50 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 3 16:47:50 2014 -0700
----------------------------------------------------------------------
.../helix/model/ClusterConfiguration.java | 31 +++++++++------
.../java/org/apache/helix/model/IdealState.java | 16 +++++---
.../org/apache/helix/model/InstanceConfig.java | 41 ++++++++++++--------
.../helix/model/ResourceConfiguration.java | 31 +++++++++------
.../java/org/apache/helix/task/Workflow.java | 16 ++++++++
.../apache/helix/provisioning/TaskConfig.java | 17 ++++++++
.../helix/provisioning/yarn/AppLauncher.java | 23 ++++++++++-
.../provisioning/yarn/AppMasterConfig.java | 17 ++++++--
.../provisioning/yarn/AppMasterLauncher.java | 28 +++++++++++--
.../src/main/resources/job_runner_app_spec.yaml | 8 +---
10 files changed, 167 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 1e9c205..63f5776 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -25,11 +25,14 @@ import org.apache.helix.api.config.NamespacedConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.log4j.Logger;
/**
* Persisted configuration properties for a cluster
*/
public class ClusterConfiguration extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(ClusterConfiguration.class);
+
/**
* Instantiate for an id
* @param id cluster id
@@ -76,21 +79,25 @@ public class ClusterConfiguration extends HelixProperty {
*/
public UserConfig getUserConfig() {
UserConfig userConfig = UserConfig.from(this);
- for (String simpleField : _record.getSimpleFields().keySet()) {
- if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "")
- && !simpleField.equals(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)) {
- userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ try {
+ for (String simpleField : _record.getSimpleFields().keySet()) {
+ if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "")
+ && !simpleField.equals(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)) {
+ userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ }
}
- }
- for (String listField : _record.getListFields().keySet()) {
- if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
- userConfig.setListField(listField, _record.getListField(listField));
+ for (String listField : _record.getListFields().keySet()) {
+ if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+ userConfig.setListField(listField, _record.getListField(listField));
+ }
}
- }
- for (String mapField : _record.getMapFields().keySet()) {
- if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
- userConfig.setMapField(mapField, _record.getMapField(mapField));
+ for (String mapField : _record.getMapFields().keySet()) {
+ if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+ userConfig.setMapField(mapField, _record.getMapField(mapField));
+ }
}
+ } catch (NoSuchMethodError e) {
+ LOG.error("Could not parse ClusterConfiguration", e);
}
return userConfig;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 173e251..cc8fc4b 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -58,6 +58,8 @@ import com.google.common.collect.Sets;
* The ideal states of all partitions in a resource
*/
public class IdealState extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(IdealState.class);
+
/**
* Properties that are persisted and are queryable for an ideal state
*/
@@ -760,12 +762,16 @@ public class IdealState extends HelixProperty {
* @param userConfig the user config to update
*/
public void updateUserConfig(UserConfig userConfig) {
- for (String simpleField : _record.getSimpleFields().keySet()) {
- Optional<IdealStateProperty> enumField =
- Enums.getIfPresent(IdealStateProperty.class, simpleField);
- if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
- userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ try {
+ for (String simpleField : _record.getSimpleFields().keySet()) {
+ Optional<IdealStateProperty> enumField =
+ Enums.getIfPresent(IdealStateProperty.class, simpleField);
+ if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
+ userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ }
}
+ } catch (NoSuchMethodError e) {
+ LOG.error("Could not update user config", e);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 5f27b05..2dde23e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.id.PartitionId;
import org.apache.helix.controller.provisioner.ContainerId;
import org.apache.helix.controller.provisioner.ContainerSpec;
import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.log4j.Logger;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
@@ -42,6 +43,8 @@ import com.google.common.base.Optional;
* Instance configurations
*/
public class InstanceConfig extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(InstanceConfig.class);
+
/**
* Configurable characteristics of an instance
*/
@@ -279,26 +282,30 @@ public class InstanceConfig extends HelixProperty {
*/
public UserConfig getUserConfig() {
UserConfig userConfig = UserConfig.from(this);
- for (String simpleField : _record.getSimpleFields().keySet()) {
- Optional<InstanceConfigProperty> enumField =
- Enums.getIfPresent(InstanceConfigProperty.class, simpleField);
- if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
- userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ try {
+ for (String simpleField : _record.getSimpleFields().keySet()) {
+ Optional<InstanceConfigProperty> enumField =
+ Enums.getIfPresent(InstanceConfigProperty.class, simpleField);
+ if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
+ userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ }
}
- }
- for (String listField : _record.getListFields().keySet()) {
- Optional<InstanceConfigProperty> enumField =
- Enums.getIfPresent(InstanceConfigProperty.class, listField);
- if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
- userConfig.setListField(listField, _record.getListField(listField));
+ for (String listField : _record.getListFields().keySet()) {
+ Optional<InstanceConfigProperty> enumField =
+ Enums.getIfPresent(InstanceConfigProperty.class, listField);
+ if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
+ userConfig.setListField(listField, _record.getListField(listField));
+ }
}
- }
- for (String mapField : _record.getMapFields().keySet()) {
- Optional<InstanceConfigProperty> enumField =
- Enums.getIfPresent(InstanceConfigProperty.class, mapField);
- if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
- userConfig.setMapField(mapField, _record.getMapField(mapField));
+ for (String mapField : _record.getMapFields().keySet()) {
+ Optional<InstanceConfigProperty> enumField =
+ Enums.getIfPresent(InstanceConfigProperty.class, mapField);
+ if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
+ userConfig.setMapField(mapField, _record.getMapField(mapField));
+ }
}
+ } catch (NoSuchMethodError e) {
+ LOG.error("Could not parse InstanceConfig", e);
}
return userConfig;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 65762cf..46d7ed7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.provisioner.ProvisionerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.log4j.Logger;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
@@ -36,6 +37,8 @@ import com.google.common.base.Optional;
* Persisted configuration properties for a resource
*/
public class ResourceConfiguration extends HelixProperty {
+ private static final Logger LOG = Logger.getLogger(ResourceConfiguration.class);
+
public enum Fields {
TYPE
}
@@ -86,21 +89,25 @@ public class ResourceConfiguration extends HelixProperty {
*/
public UserConfig getUserConfig() {
UserConfig userConfig = UserConfig.from(this);
- for (String simpleField : _record.getSimpleFields().keySet()) {
- Optional<Fields> enumField = Enums.getIfPresent(Fields.class, simpleField);
- if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
- userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ try {
+ for (String simpleField : _record.getSimpleFields().keySet()) {
+ Optional<Fields> enumField = Enums.getIfPresent(Fields.class, simpleField);
+ if (!simpleField.contains(NamespacedConfig.PREFIX_CHAR + "") && !enumField.isPresent()) {
+ userConfig.setSimpleField(simpleField, _record.getSimpleField(simpleField));
+ }
}
- }
- for (String listField : _record.getListFields().keySet()) {
- if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
- userConfig.setListField(listField, _record.getListField(listField));
+ for (String listField : _record.getListFields().keySet()) {
+ if (!listField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+ userConfig.setListField(listField, _record.getListField(listField));
+ }
}
- }
- for (String mapField : _record.getMapFields().keySet()) {
- if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
- userConfig.setMapField(mapField, _record.getMapField(mapField));
+ for (String mapField : _record.getMapFields().keySet()) {
+ if (!mapField.contains(NamespacedConfig.PREFIX_CHAR + "")) {
+ userConfig.setMapField(mapField, _record.getMapField(mapField));
+ }
}
+ } catch (NoSuchMethodError e) {
+ LOG.error("Could not parse ResourceConfiguration", e);
}
return userConfig;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 383180e..1a41e06 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
@@ -132,10 +133,25 @@ public class Workflow {
return parse(new StringReader(yaml));
}
+ /**
+ * Read a workflow from an open input stream
+ * @param inputStream the stream
+ * @return Workflow
+ */
+ public static Workflow parse(InputStream inputStream) {
+ Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
+ WorkflowBean wf = (WorkflowBean) yaml.load(inputStream);
+ return parse(wf);
+ }
+
/** Helper function to parse workflow from a generic {@link Reader} */
private static Workflow parse(Reader reader) throws Exception {
Yaml yaml = new Yaml(new Constructor(WorkflowBean.class));
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
+ return parse(wf);
+ }
+
+ private static Workflow parse(WorkflowBean wf) {
Builder builder = new Builder(wf.name);
for (JobBean job : wf.jobs) {
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
index 283538d..442d074 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@ -1,10 +1,27 @@
package org.apache.helix.provisioning;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.log4j.Logger;
+
public class TaskConfig {
+ private static final Logger LOG = Logger.getLogger(TaskConfig.class);
+
public Map<String, String> config = new HashMap<String, String>();
+ public String yamlFile;
+ public String name;
+
+ public URI getYamlURI() {
+ try {
+ return yamlFile != null ? new URI(yamlFile) : null;
+ } catch (URISyntaxException e) {
+ LOG.error("Error parsing URI for task config", e);
+ }
+ return null;
+ }
public String getValue(String key) {
return (config != null ? config.get(key) : null);
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 76b7877..2db4afb 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -53,6 +53,7 @@ import org.apache.helix.manager.zk.ZkHelixConnection;
import org.apache.helix.provisioning.ApplicationSpec;
import org.apache.helix.provisioning.ApplicationSpecFactory;
import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.TaskConfig;
/**
* Main class to launch the job.
@@ -151,6 +152,19 @@ public class AppLauncher {
_appMasterConfig.setMainClass(name, serviceMainClass);
}
}
+
+ // Get YAML files describing all workflows to immediately start
+ Map<String, URI> workflowFiles = new HashMap<String, URI>();
+ List<TaskConfig> taskConfigs = _applicationSpec.getTaskConfigs();
+ if (taskConfigs != null) {
+ for (TaskConfig taskConfig : taskConfigs) {
+ URI configUri = taskConfig.getYamlURI();
+ if (taskConfig.name != null && configUri != null) {
+ workflowFiles.put(taskConfig.name, taskConfig.getYamlURI());
+ }
+ }
+ }
+
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
@@ -163,6 +177,13 @@ public class AppLauncher {
hdfsDest.get(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString()));
localResources.put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterPkg);
localResources.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), appSpecFile);
+ for (String name : workflowFiles.keySet()) {
+ URI uri = workflowFiles.get(name);
+ Path dst = copyToHDFS(fs, name, uri);
+ LocalResource taskLocalResource = setupLocalResource(fs, dst);
+ localResources.put(AppMasterConfig.AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + name,
+ taskLocalResource);
+ }
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
@@ -393,7 +414,7 @@ public class AppLauncher {
prevReport = reportMessage;
Thread.sleep(10000);
} catch (Exception e) {
- LOG.error("Exception while getting info ");
+ LOG.error("Exception while getting info ", e);
break;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
index 9dcabc2..38a0dd1 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
@@ -19,7 +19,8 @@ public class AppMasterConfig {
APP_SPEC_FILE("APP_SPEC_FILE"),
APP_NAME("APP_NAME"),
APP_ID("APP_ID"),
- APP_SPEC_FACTORY("APP_SPEC_FACTORY");
+ APP_SPEC_FACTORY("APP_SPEC_FACTORY"),
+ TASK_CONFIG_FILE("TASK_CONFIG_FILE");
String _name;
private AppEnvironment(String name) {
@@ -37,8 +38,8 @@ public class AppMasterConfig {
private String get(String key) {
String value = (_envs.containsKey(key)) ? _envs.get(key) : System.getenv().get(key);
- LOG.info("Returning value:"+ value +" for key:'"+ key + "'");
-
+ LOG.info("Returning value:" + value + " for key:'" + key + "'");
+
return value;
}
@@ -83,6 +84,14 @@ public class AppMasterConfig {
_envs.put(serviceName + "_classpath", classpath);
}
+ public void setTaskConfigFile(String configName, String path) {
+ _envs.put(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName, path);
+ }
+
+ public String getTaskConfigFile(String configName) {
+ return get(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName);
+ }
+
public String getApplicationSpecConfigFile() {
return get(AppEnvironment.APP_SPEC_FILE.toString());
}
@@ -97,6 +106,6 @@ public class AppMasterConfig {
}
public void setMainClass(String serviceName, String serviceMainClass) {
- _envs.put(serviceName + "_mainClass", serviceMainClass);
+ _envs.put(serviceName + "_mainClass", serviceMainClass);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 523fee0..e7a0f61 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -3,6 +3,8 @@ package org.apache.helix.provisioning.yarn;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.util.List;
import java.util.Map;
@@ -11,8 +13,12 @@ import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
import org.apache.commons.cli.Options;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.helix.HelixController;
import org.apache.helix.api.accessor.ClusterAccessor;
@@ -150,11 +156,16 @@ public class AppMasterLauncher {
// Start any pre-specified jobs
List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
if (taskConfigs != null) {
+ YarnConfiguration conf = new YarnConfiguration();
+ FileSystem fs;
+ fs = FileSystem.get(conf);
for (TaskConfig taskConfig : taskConfigs) {
- String yamlFile = taskConfig.getValue("yamlFile");
- if (yamlFile != null) {
- File file = new File(yamlFile);
- Workflow workflow = Workflow.parse(file);
+ URI yamlUri = taskConfig.getYamlURI();
+ if (yamlUri != null && taskConfig.name != null) {
+ InputStream is =
+ readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
+ appAttemptID.getApplicationId());
+ Workflow workflow = Workflow.parse(is);
TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
taskDriver.start(workflow);
}
@@ -171,4 +182,13 @@ public class AppMasterLauncher {
Thread.sleep(10000);
}
+
+ private static InputStream readFromHDFS(FileSystem fs, String name, URI uri,
+ ApplicationSpec appSpec, ApplicationId appId) throws Exception {
+ // will throw exception if the file name is without extension
+ String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+ String pathSuffix = appSpec.getAppName() + "/" + appId.getId() + "/" + name + "." + extension;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ return fs.open(dst).getWrappedStream();
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/feaea562/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
index ad62ffc..0945690 100755
--- a/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
+++ b/recipes/jobrunner-yarn/src/main/resources/job_runner_app_spec.yaml
@@ -19,9 +19,5 @@ servicePackageURIMap: {
services: [
JobRunner]
taskConfigs:
- - config: {
- yamlFile: '/Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'
- }
-
-
-
+ - name: JobRunnerWorkflow
+ yamlFile: 'file:///Users/kbiscuit/helix/incubator-helix/recipes/jobrunner-yarn/src/main/resources/dummy_job.yaml'