You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 17:28:21 UTC
airavata git commit: Add Factory for JobSubmissionTask and WorkerCorre
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt 47587b5be -> 4f6ac4dcf
Add Factory for JobSubmissionTask and WorkerCorre
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4f6ac4dc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4f6ac4dc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4f6ac4dc
Branch: refs/heads/feature-workload-mgmt
Commit: 4f6ac4dcf2aaee244db9a7c71d936b7f9af18767
Parents: 47587b5
Author: Gourav Shenoy <go...@apache.org>
Authored: Thu Apr 13 13:28:04 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Thu Apr 13 13:28:04 2017 -0400
----------------------------------------------------------------------
.../impl/DefaultJobSubmissionTask.java | 3 +-
.../utils/JobSubmissionFactory.java | 88 ++++++++++++++++++++
.../jobsubmission/utils/JobSubmissionUtils.java | 42 ----------
modules/worker/worker-commons/pom.xml | 5 ++
.../org/apache/airavata/worker/commons/sample | 0
.../worker/commons/utils/WorkerFactory.java | 82 ++++++++++++++++++
6 files changed, 177 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
index a4cf5c7..2608b76 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
@@ -43,6 +43,7 @@ import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
import org.apache.airavata.worker.commons.utils.WorkerUtils;
import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory;
import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
import org.apache.airavata.worker.task.jobsubmission.utils.Script;
import org.apache.commons.io.FileUtils;
@@ -82,7 +83,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext);
JobManagerConfiguration jConfig = null;
if (resourceJobManager != null) {
- jConfig = JobSubmissionUtils.getJobManagerConfiguration(resourceJobManager);
+ jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager);
}
JobStatus jobStatus = new JobStatus();
File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig);
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
new file mode 100644
index 0000000..d37936b
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
@@ -0,0 +1,88 @@
+
+package org.apache.airavata.worker.task.jobsubmission.utils;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.config.JobSubmitterTaskConfig;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.utils.WorkerFactory;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.commons.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.config.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class JobSubmissionFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(JobSubmissionFactory.class);
+ private static Map<JobSubmissionProtocol, JobSubmissionTask> jobSubmissionTask = new HashMap<>();
+
+ public static void loadConfiguration() throws WorkerException {
+ WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+ try {
+ // load workerfactory configuration
+ WorkerFactory.loadConfiguration();
+
+ for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
+ String taskClass = jobSubmitterTaskConfig.getTaskClass();
+ Class<?> aClass = Class.forName(taskClass);
+ Constructor<?> constructor = aClass.getConstructor();
+ JobSubmissionTask task = (JobSubmissionTask) constructor.newInstance();
+ task.init(jobSubmitterTaskConfig.getProperties());
+ jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), task);
+ }
+ } catch (Exception e) {
+ throw new WorkerException("JobSubmission config issue", e);
+ }
+ }
+
+ public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws WorkerException {
+ if(resourceJobManager == null)
+ return null;
+
+ ResourceConfig resourceConfig = WorkerFactory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
+ OutputParser outputParser;
+ try {
+ Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
+ (OutputParser.class);
+ outputParser = aClass.getConstructor().newInstance();
+ } catch (Exception e) {
+ throw new WorkerException("Error while instantiating output parser for " + resourceJobManager
+ .getResourceJobManagerType().name());
+ }
+
+ String templateFileName = WorkerUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+ switch (resourceJobManager.getResourceJobManagerType()) {
+ case PBS:
+ return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), outputParser);
+ case SLURM:
+ return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
+ .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser);
+ case LSF:
+ return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), outputParser);
+ case UGE:
+ return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), outputParser);
+ case FORK:
+ return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
+ resourceJobManager.getJobManagerCommands(), outputParser);
+ // We don't have a job configuration manager for CLOUD type
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
index 6e3ebbc..dcc95c8 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -564,46 +564,4 @@ public class JobSubmissionUtils {
outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
return new File(outputPath + taskContext.getParentProcessContext() .getProcessId());
}
-
- public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws WorkerException {
- if(resourceJobManager == null)
- return null;
-
- ResourceConfig resourceConfig = JobSubmissionUtils.getResourceConfig(resourceJobManager.getResourceJobManagerType());
- OutputParser outputParser;
- try {
- Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
- (OutputParser.class);
- outputParser = aClass.getConstructor().newInstance();
- } catch (Exception e) {
- throw new WorkerException("Error while instantiating output parser for " + resourceJobManager
- .getResourceJobManagerType().name());
- }
-
- String templateFileName = WorkerUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType());
- switch (resourceJobManager.getResourceJobManagerType()) {
- case PBS:
- return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
- resourceJobManager.getJobManagerCommands(), outputParser);
- case SLURM:
- return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
- .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser);
- case LSF:
- return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
- resourceJobManager.getJobManagerCommands(), outputParser);
- case UGE:
- return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
- resourceJobManager.getJobManagerCommands(), outputParser);
- case FORK:
- return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
- resourceJobManager.getJobManagerCommands(), outputParser);
- // We don't have a job configuration manager for CLOUD type
- default:
- return null;
- }
- }
-
- private static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
- return resources.get(resourceJobManagerType);
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/pom.xml b/modules/worker/worker-commons/pom.xml
index cc5c2c7..7dceaa6 100644
--- a/modules/worker/worker-commons/pom.xml
+++ b/modules/worker/worker-commons/pom.xml
@@ -53,6 +53,11 @@
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
+ <!-- Guava -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
new file mode 100644
index 0000000..4ab403f
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.commons.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class WorkerFactory {
+ private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
+
+ private static boolean isWorkerConfigurationLoaded = false;
+ private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
+ private static Cache<String,Session> sessionCache;
+
+ public static void loadConfiguration() throws WorkerException {
+ if (!isWorkerConfigurationLoaded) {
+ WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+ try {
+ for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+ resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+ }
+ } catch (Exception e) {
+ throw new WorkerException("Worker config issue", e);
+ }
+
+ sessionCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
+ .removalListener((RemovalListener<String, Session>) removalNotification -> {
+ if (removalNotification.getValue().isConnected()) {
+ log.info("Disconnecting ssh session with key: " + removalNotification.getKey());
+ removalNotification.getValue().disconnect();
+ }
+ log.info("Removed ssh session with key: " + removalNotification.getKey());
+ })
+ .build();
+
+ isWorkerConfigurationLoaded = true;
+ }
+ }
+
+ public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() {
+ return resources;
+ }
+
+ public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
+ return resources.get(resourceJobManagerType);
+ }
+}