You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 16:33:55 UTC

airavata git commit: Add JobSubmission related config utilities to worker-commons

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 30e1f0ee8 -> eaca31fe0


Add JobSubmission related config utilities to worker-commons


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/eaca31fe
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/eaca31fe
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/eaca31fe

Branch: refs/heads/feature-workload-mgmt
Commit: eaca31fe018f1de8d4f528c0de734e1d0d736ec0
Parents: 30e1f0e
Author: Gourav Shenoy <go...@apache.org>
Authored: Thu Apr 13 12:33:49 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Thu Apr 13 12:33:49 2017 -0400

----------------------------------------------------------------------
 modules/worker/worker-commons/pom.xml           |   6 +
 .../commons/config/DataTransferTaskConfig.java  |  60 ++++++++
 .../commons/config/JobSubmitterTaskConfig.java  |  59 ++++++++
 .../worker/commons/config/ResourceConfig.java   |  63 ++++++++
 .../commons/config/WorkerYamlConfigruation.java | 150 +++++++++++++++++++
 .../worker/commons/utils/WorkerUtils.java       |  20 +++
 6 files changed, 358 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/pom.xml b/modules/worker/worker-commons/pom.xml
index 8964a2f..cc5c2c7 100644
--- a/modules/worker/worker-commons/pom.xml
+++ b/modules/worker/worker-commons/pom.xml
@@ -47,6 +47,12 @@
             <artifactId>commons-io</artifactId>
             <version>2.5</version>
         </dependency>
+        <!-- Configuration file read-->
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>${snakeyaml.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
new file mode 100644
index 0000000..1241435
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/DataTransferTaskConfig.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataTransferTaskConfig {
+	private DataMovementProtocol transferProtocol;
+	private String taskClass;
+	private Map<String,String> properties = new HashMap<>();
+
+
+	public DataMovementProtocol getTransferProtocol() {
+		return transferProtocol;
+	}
+
+	public void setTransferProtocol(DataMovementProtocol transferProtocol) {
+		this.transferProtocol = transferProtocol;
+	}
+
+	public String getTaskClass() {
+		return taskClass;
+	}
+
+	public void setTaskClass(String taskClass) {
+		this.taskClass = taskClass;
+	}
+
+	public void addProperty(String key, String value) {
+		properties.put(key, value);
+	}
+
+	public void addProperties(Map<String, String> propMap) {
+		propMap.forEach(properties::put);
+	}
+
+	public Map<String,String> getProperties(){
+		return properties;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
new file mode 100644
index 0000000..5744345
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/JobSubmitterTaskConfig.java
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobSubmitterTaskConfig {
+	private JobSubmissionProtocol submissionProtocol;
+	private String taskClass;
+	private Map<String,String> properties = new HashMap<>();
+
+	public JobSubmissionProtocol getSubmissionProtocol() {
+		return submissionProtocol;
+	}
+
+	public void setSubmissionProtocol(JobSubmissionProtocol submissionProtocol) {
+		this.submissionProtocol = submissionProtocol;
+	}
+
+	public String getTaskClass() {
+		return taskClass;
+	}
+
+	public void setTaskClass(String taskClass) {
+		this.taskClass = taskClass;
+	}
+
+	public void addProperty(String key, String value) {
+		properties.put(key, value);
+	}
+
+	public void addProperties(Map<String, String> propMap) {
+		propMap.forEach(properties::put);
+	}
+
+	public Map<String,String> getProperties(){
+		return properties;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
new file mode 100644
index 0000000..12eed5a
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/ResourceConfig.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+
+import java.util.List;
+
+public class ResourceConfig {
+	private ResourceJobManagerType jobManagerType;
+	private String commandOutputParser;
+	private String emailParser;
+	private List<String> resourceEmailAddresses;
+
+	public ResourceJobManagerType getJobManagerType() {
+		return jobManagerType;
+	}
+
+	public void setJobManagerType(ResourceJobManagerType jobManagerType) {
+		this.jobManagerType = jobManagerType;
+	}
+
+	public String getCommandOutputParser() {
+		return commandOutputParser;
+	}
+
+	public void setCommandOutputParser(String commandOutputParser) {
+		this.commandOutputParser = commandOutputParser;
+	}
+
+	public String getEmailParser() {
+		return emailParser;
+	}
+
+	public void setEmailParser(String emailParser) {
+		this.emailParser = emailParser;
+	}
+
+	public List<String> getResourceEmailAddresses() {
+		return resourceEmailAddresses;
+	}
+
+	public void setResourceEmailAddresses(List<String> resourceEmailAddresses) {
+		this.resourceEmailAddresses = resourceEmailAddresses;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
new file mode 100644
index 0000000..5d2e372
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/config/WorkerYamlConfigruation.java
@@ -0,0 +1,150 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.commons.config;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class WorkerYamlConfigruation {
+
+	private static final String CONFIG = "config";
+	private static final String JOB_SUBMITTERS = "jobSubmitters";
+	private static final String SUBMISSIO_PROTOCOL = "submissionProtocol";
+	private static final String TASK_CLASS = "taskClass";
+	private static final String COMMON_TASKS = "commonTasks";
+	private static final String TYPE = "type";
+	private static final String FILE_TRANSFER_TASKS = "fileTransferTasks";
+	private static final String TRANSFER_PROTOCOL = "transferProtocol";
+	private static final String RESOURCES = "resources";
+	private static final String JOB_MANAGER_TYPE = "jobManagerType";
+	private static final String COMMAND_OUTPUT_PARSER = "commandOutputParser";
+	private static final String EMAIL_PARSER = "emailParser";
+	private static final String RESOURCE_EMAIL_ADDRESS = "resourceEmailAddresses";
+	private static final String PROPERTIES = "properties";
+
+	private List<JobSubmitterTaskConfig> jobSubmitters = new ArrayList<>();
+	private List<DataTransferTaskConfig> fileTransferTasks = new ArrayList<>();
+	private List<ResourceConfig> resources = new ArrayList<>();
+
+
+	public WorkerYamlConfigruation() throws WorkerException {
+		InputStream resourceAsStream = WorkerYamlConfigruation.class.getClassLoader().
+				getResourceAsStream("gfac-config.yaml");
+		parse(resourceAsStream);
+	}
+
+	private void parse(InputStream resourceAsStream) throws WorkerException {
+		if (resourceAsStream == null) {
+			throw new WorkerException("Configuration file{gfac-config.yaml} is not fund");
+		}
+		Yaml yaml = new Yaml();
+		Object load = yaml.load(resourceAsStream);
+		if (load == null) {
+			throw new WorkerException("Yaml configuration object null");
+		}
+
+		if (load instanceof Map) {
+			Map<String, Object> loadMap = (Map<String, Object>) load;
+			String identifier;
+			List<Map<String,Object >> jobSubYamls = (List<Map<String, Object>>) loadMap.get(JOB_SUBMITTERS);
+			JobSubmitterTaskConfig jobSubmitterTaskConfig;
+			if (jobSubYamls != null) {
+				for (Map<String, Object> jobSub : jobSubYamls) {
+					jobSubmitterTaskConfig = new JobSubmitterTaskConfig();
+					identifier = ((String) jobSub.get(SUBMISSIO_PROTOCOL));
+					jobSubmitterTaskConfig.setSubmissionProtocol(JobSubmissionProtocol.valueOf(identifier));
+					jobSubmitterTaskConfig.setTaskClass(((String) jobSub.get(TASK_CLASS)));
+					Object propertiesObj = jobSub.get(PROPERTIES);
+					List propertiesList;
+					if (propertiesObj instanceof List) {
+						propertiesList = ((List) propertiesObj);
+						if (propertiesList.size() > 0) {
+							Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+							jobSubmitterTaskConfig.addProperties(props);
+						}
+					}
+					jobSubmitters.add(jobSubmitterTaskConfig);
+				}
+			}
+
+			List<Map<String, Object>> fileTransYamls = (List<Map<String, Object>>) loadMap.get(FILE_TRANSFER_TASKS);
+			DataTransferTaskConfig dataTransferTaskConfig;
+			if (fileTransYamls != null) {
+				for (Map<String, Object> fileTransConfig : fileTransYamls) {
+					dataTransferTaskConfig = new DataTransferTaskConfig();
+					identifier = ((String) fileTransConfig.get(TRANSFER_PROTOCOL));
+					dataTransferTaskConfig.setTransferProtocol(DataMovementProtocol.valueOf(identifier));
+					dataTransferTaskConfig.setTaskClass(((String) fileTransConfig.get(TASK_CLASS)));
+					Object propertiesObj = fileTransConfig.get(PROPERTIES);
+					List propertiesList;
+					if (propertiesObj instanceof List) {
+						propertiesList = (List) propertiesObj;
+						if (propertiesList.size() > 0) {
+							Map<String, String> props = (Map<String, String>) propertiesList.get(0);
+							dataTransferTaskConfig.addProperties(props);
+						}
+					}
+					fileTransferTasks.add(dataTransferTaskConfig);
+				}
+			}
+
+			List<Map<String, Object>> resourcesYaml = (List<Map<String, Object>>) loadMap.get(RESOURCES);
+			ResourceConfig resourceConfig;
+			if (resourcesYaml != null) {
+				for (Map<String, Object> resource : resourcesYaml) {
+					resourceConfig = new ResourceConfig();
+					identifier = resource.get(JOB_MANAGER_TYPE).toString();
+					resourceConfig.setJobManagerType(ResourceJobManagerType.valueOf(identifier));
+					resourceConfig.setCommandOutputParser(resource.get(COMMAND_OUTPUT_PARSER).toString());
+                    Object emailParser = resource.get(EMAIL_PARSER);
+                    if (emailParser != null){
+                        resourceConfig.setEmailParser(emailParser.toString());
+                    }
+					List<String> emailAddressList = (List<String>) resource.get(RESOURCE_EMAIL_ADDRESS);
+					resourceConfig.setResourceEmailAddresses(emailAddressList);
+					resources.add(resourceConfig);
+				}
+			}
+		}
+	}
+
+	public List<JobSubmitterTaskConfig> getJobSbumitters() {
+		return jobSubmitters;
+	}
+
+	public List<DataTransferTaskConfig> getFileTransferTasks() {
+		return fileTransferTasks;
+	}
+
+	public List<ResourceConfig> getResourceConfiguration() {
+		return resources;
+	}
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/eaca31fe/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
index 2a104dc..dd244fb 100644
--- a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerUtils.java
@@ -2,6 +2,7 @@ package org.apache.airavata.worker.commons.utils;
 
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.messaging.event.*;
@@ -191,4 +192,23 @@ public class WorkerUtils {
                     + e.getLocalizedMessage(), e);
         }
     }
+
+    public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) {
+        switch (resourceJobManagerType) {
+            case FORK:
+                return "UGE_Groovy.template";
+            case PBS:
+                return "PBS_Groovy.template";
+            case SLURM:
+                return "SLURM_Groovy.template";
+            case UGE:
+                return "UGE_Groovy.template";
+            case LSF:
+                return "LSF_Groovy.template";
+            case CLOUD:
+                return "CLOUD_Groovy.template";
+            default:
+                return null;
+        }
+    }
 }