You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/04/13 17:28:21 UTC

airavata git commit: Add Factory for JobSubmissionTask and WorkerCorre

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 47587b5be -> 4f6ac4dcf


Add Factory for JobSubmissionTask and WorkerCorre


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4f6ac4dc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4f6ac4dc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4f6ac4dc

Branch: refs/heads/feature-workload-mgmt
Commit: 4f6ac4dcf2aaee244db9a7c71d936b7f9af18767
Parents: 47587b5
Author: Gourav Shenoy <go...@apache.org>
Authored: Thu Apr 13 13:28:04 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Thu Apr 13 13:28:04 2017 -0400

----------------------------------------------------------------------
 .../impl/DefaultJobSubmissionTask.java          |  3 +-
 .../utils/JobSubmissionFactory.java             | 88 ++++++++++++++++++++
 .../jobsubmission/utils/JobSubmissionUtils.java | 42 ----------
 modules/worker/worker-commons/pom.xml           |  5 ++
 .../org/apache/airavata/worker/commons/sample   |  0
 .../worker/commons/utils/WorkerFactory.java     | 82 ++++++++++++++++++
 6 files changed, 177 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
index a4cf5c7..2608b76 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/DefaultJobSubmissionTask.java
@@ -43,6 +43,7 @@ import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
 import org.apache.airavata.worker.commons.utils.WorkerUtils;
 import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
 import org.apache.airavata.worker.task.jobsubmission.utils.GroovyMap;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionFactory;
 import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
 import org.apache.airavata.worker.task.jobsubmission.utils.Script;
 import org.apache.commons.io.FileUtils;
@@ -82,7 +83,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 			ResourceJobManager resourceJobManager = JobSubmissionUtils.getResourceJobManager(processContext);
 		    JobManagerConfiguration jConfig = null;
 		    if (resourceJobManager != null) {
-			    jConfig = JobSubmissionUtils.getJobManagerConfiguration(resourceJobManager);
+			    jConfig = JobSubmissionFactory.getJobManagerConfiguration(resourceJobManager);
 		    }
 		    JobStatus jobStatus = new JobStatus();
 		    File jobFile = JobSubmissionUtils.createJobFile(groovyMap, taskContext, jConfig);

http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
new file mode 100644
index 0000000..d37936b
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionFactory.java
@@ -0,0 +1,88 @@
+
+package org.apache.airavata.worker.task.jobsubmission.utils;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.worker.commons.cluster.OutputParser;
+import org.apache.airavata.worker.commons.config.JobSubmitterTaskConfig;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.apache.airavata.worker.commons.utils.WorkerFactory;
+import org.apache.airavata.worker.commons.utils.JobManagerConfiguration;
+import org.apache.airavata.worker.commons.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.config.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class JobSubmissionFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(JobSubmissionFactory.class);
+    private static Map<JobSubmissionProtocol, JobSubmissionTask> jobSubmissionTask = new HashMap<>();
+
+    public static void loadConfiguration() throws WorkerException {
+        WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+        try {
+            // load workerfactory configuration
+            WorkerFactory.loadConfiguration();
+
+            for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) {
+                String taskClass = jobSubmitterTaskConfig.getTaskClass();
+                Class<?> aClass = Class.forName(taskClass);
+                Constructor<?> constructor = aClass.getConstructor();
+                JobSubmissionTask task = (JobSubmissionTask) constructor.newInstance();
+                task.init(jobSubmitterTaskConfig.getProperties());
+                jobSubmissionTask.put(jobSubmitterTaskConfig.getSubmissionProtocol(), task);
+            }
+        } catch (Exception e) {
+            throw new WorkerException("JobSubmission config issue", e);
+        }
+    }
+
+    public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws WorkerException {
+        if(resourceJobManager == null)
+            return null;
+
+        ResourceConfig resourceConfig = WorkerFactory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
+        OutputParser outputParser;
+        try {
+            Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
+                    (OutputParser.class);
+            outputParser = aClass.getConstructor().newInstance();
+        } catch (Exception e) {
+            throw new WorkerException("Error while instantiating output parser for " + resourceJobManager
+                    .getResourceJobManagerType().name());
+        }
+
+        String templateFileName = WorkerUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+        switch (resourceJobManager.getResourceJobManagerType()) {
+            case PBS:
+                return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case SLURM:
+                return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
+                        .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser);
+            case LSF:
+                return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case UGE:
+                return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            case FORK:
+                return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
+                        resourceJobManager.getJobManagerCommands(), outputParser);
+            // We don't have a job configuration manager for CLOUD type
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
index 6e3ebbc..dcc95c8 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/JobSubmissionUtils.java
@@ -564,46 +564,4 @@ public class JobSubmissionUtils {
         outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator);
         return new File(outputPath + taskContext.getParentProcessContext() .getProcessId());
     }
-
-    public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws WorkerException {
-        if(resourceJobManager == null)
-            return null;
-
-        ResourceConfig resourceConfig = JobSubmissionUtils.getResourceConfig(resourceJobManager.getResourceJobManagerType());
-        OutputParser outputParser;
-        try {
-            Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass
-                    (OutputParser.class);
-            outputParser = aClass.getConstructor().newInstance();
-        } catch (Exception e) {
-            throw new WorkerException("Error while instantiating output parser for " + resourceJobManager
-                    .getResourceJobManagerType().name());
-        }
-
-        String templateFileName = WorkerUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType());
-        switch (resourceJobManager.getResourceJobManagerType()) {
-            case PBS:
-                return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
-                        resourceJobManager.getJobManagerCommands(), outputParser);
-            case SLURM:
-                return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager
-                        .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser);
-            case LSF:
-                return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(),
-                        resourceJobManager.getJobManagerCommands(), outputParser);
-            case UGE:
-                return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
-                        resourceJobManager.getJobManagerCommands(), outputParser);
-            case FORK:
-                return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(),
-                        resourceJobManager.getJobManagerCommands(), outputParser);
-            // We don't have a job configuration manager for CLOUD type
-            default:
-                return null;
-        }
-    }
-
-    private static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
-        return resources.get(resourceJobManagerType);
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/pom.xml b/modules/worker/worker-commons/pom.xml
index cc5c2c7..7dceaa6 100644
--- a/modules/worker/worker-commons/pom.xml
+++ b/modules/worker/worker-commons/pom.xml
@@ -53,6 +53,11 @@
             <artifactId>snakeyaml</artifactId>
             <version>${snakeyaml.version}</version>
         </dependency>
+        <!-- Guava -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/sample
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/airavata/blob/4f6ac4dc/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
new file mode 100644
index 0000000..4ab403f
--- /dev/null
+++ b/modules/worker/worker-commons/src/main/java/org/apache/airavata/worker/commons/utils/WorkerFactory.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.commons.utils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.worker.commons.config.ResourceConfig;
+import org.apache.airavata.worker.commons.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.commons.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by goshenoy on 4/13/17.
+ */
+public class WorkerFactory {
+    private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
+
+    private static boolean isWorkerConfigurationLoaded = false;
+    private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
+    private static Cache<String,Session> sessionCache;
+
+    public static void loadConfiguration() throws WorkerException {
+        if (!isWorkerConfigurationLoaded) {
+            WorkerYamlConfigruation config = new WorkerYamlConfigruation();
+            try {
+                for (ResourceConfig resourceConfig : config.getResourceConfiguration()) {
+                    resources.put(resourceConfig.getJobManagerType(), resourceConfig);
+                }
+            } catch (Exception e) {
+                throw new WorkerException("Worker config issue", e);
+            }
+
+            sessionCache = CacheBuilder.newBuilder()
+                    .expireAfterAccess(ServerSettings.getSessionCacheAccessTimeout(), TimeUnit.MINUTES)
+                    .removalListener((RemovalListener<String, Session>) removalNotification -> {
+                        if (removalNotification.getValue().isConnected()) {
+                            log.info("Disconnecting ssh session with key: " + removalNotification.getKey());
+                            removalNotification.getValue().disconnect();
+                        }
+                        log.info("Removed ssh session with key: " + removalNotification.getKey());
+                    })
+                    .build();
+
+            isWorkerConfigurationLoaded =  true;
+        }
+    }
+
+    public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() {
+        return resources;
+    }
+
+    public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) {
+        return resources.get(resourceJobManagerType);
+    }
+}