You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/01 20:29:20 UTC
[2/9] Separating gfac-monitoring implementation
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/resources/schema/ToStorageService.json
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/resources/schema/ToStorageService.json b/modules/gfac/gfac-monitor/src/main/resources/schema/ToStorageService.json
new file mode 100644
index 0000000..644f3d1
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/resources/schema/ToStorageService.json
@@ -0,0 +1,25 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "id": "http://schemas.ogf.org/glue/2013/05/spec_2.0_r1/ToStorageService.json",
+ "type": "object",
+ "allOf": [{"$ref": "http://schemas.ogf.org/glue/2013/05/spec_2.0_r1/Entity.json"}],
+ "properties": {
+ "LocalPath": {
+ "type": "string",
+ "description": "The path within the ComputingService that is used to access the StorageService"
+ },
+ "RemotePath": {
+ "type": "string",
+ "description": "The path in the StorageService which is associated with the LocalPath"
+ },
+ "ComputingServiceID": {
+ "type": "string",
+ "description": "The ID of the ComputingService"
+ },
+ "StorageServiceID": {
+ "type": "string",
+ "description": "The ID of the StorageService"
+ }
+ },
+ "required": ["LocalPath","RemotePath","ComputingServiceID","StorageServiceID"]
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/resources/schema/UserDomain.json
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/resources/schema/UserDomain.json b/modules/gfac/gfac-monitor/src/main/resources/schema/UserDomain.json
new file mode 100644
index 0000000..7acda31
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/resources/schema/UserDomain.json
@@ -0,0 +1,58 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "id": "http://schemas.ogf.org/glue/2013/05/spec_2.0_r1/UserDomain.json",
+ "type": "object",
+ "allOf": [{"$ref": "http://schemas.ogf.org/glue/2013/05/spec_2.0_r1/Domain.json"}],
+ "properties": {
+ "Level": {
+ "type": "integer",
+ "description": "the number of hops to reach the root of the hierarchy of UserDomains"
+ },
+ "UserManagerID": {
+ "type": "array",
+ "description": "ID for the Endpoint of a Service managing users in this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ },
+ "Member": {
+ "type": "array",
+ "description": "Identifiers for users in this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ },
+ "PolicyID": {
+ "type": "array",
+ "description": "IDs for Policies associated with this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ },
+ "ChildDomainID": {
+ "type": "array",
+ "description": "IDs of UserDomains aggregated by this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ },
+ "ParentDomainID": {
+ "type": "string",
+ "description": "The ID of the UserDomain that this UserDomain participates in"
+ },
+ "AccessPolicyID": {
+ "type": "array",
+ "description": "IDs of AccessPolicies associated with this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ },
+ "MappingPolicyID": {
+ "type": "array",
+ "description": "IDs of MappingPolicies associated with this UserDomain",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/resources/service.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/resources/service.properties b/modules/gfac/gfac-monitor/src/main/resources/service.properties
new file mode 100644
index 0000000..391bfea
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/resources/service.properties
@@ -0,0 +1,58 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+
+#
+# Class which implemented Scheduler interface. It will be used to determine a Provider
+#
+scheduler.class= org.apache.airavata.core.gfac.scheduler.impl.SchedulerImpl
+
+#
+# Data Service Plugins classes
+#
+datachain.classes= org.apache.airavata.core.gfac.extension.data.RegistryDataService
+
+#
+# Pre execution Plugins classes. For example, GridFTP Input Staging
+#
+prechain.classes= org.apache.airavata.core.gfac.extension.pre.GridFtpInputStaging
+prechain.classes= org.apache.airavata.core.gfac.extension.pre.HttpInputStaging
+
+#
+# Post execution Plugins classes. For example, GridFTP Output Staging
+#
+postchain.classes= org.apache.airavata.core.gfac.extension.post.GridFtpOutputStaging
+postchain.classes= org.apache.airavata.core.gfac.extension.post.OutputRegister
+
+#
+# SSH private key location. It will be used by SSHProvider
+#
+# ssh.key=/home/user/.ssh/id_rsa
+# ssh.keypass=
+# ssh.username=usernameAtHost
+
+#
+# MyProxy credential. It will be used by GridFTP Plugins and GramProvider.
+#
+# myproxy.server=myproxy.teragrid.org
+# myproxy.user=username
+# myproxy.pass=password
+# myproxy.life=3600
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
new file mode 100644
index 0000000..9e1eadd
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class AMQPMonitorTest {
+
+ private String myProxyUserName;
+ private String myProxyPassword;
+ private String certificateLocation;
+ private String pbsFilePath;
+ private String workingDirectory;
+ private HostDescription hostDescription;
+ private MonitorPublisher monitorPublisher;
+ private BlockingQueue<MonitorID> finishQueue;
+ private BlockingQueue<MonitorID> pushQueue;
+ private Thread pushThread;
+ private String proxyFilePath;
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("myproxy.username", "ogce");
+ System.setProperty("myproxy.password", "OpenGwy14");
+ System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+ System.setProperty("gsi.working.directory", "/home1/01437/ogce");
+ System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates");
+ System.setProperty("proxy.file.path", "/Users/lahirugunathilake/Downloads/x509up_u503876");
+ myProxyUserName = System.getProperty("myproxy.username");
+ myProxyPassword = System.getProperty("myproxy.password");
+ workingDirectory = System.getProperty("gsi.working.directory");
+ certificateLocation = System.getProperty("trusted.cert.location");
+ proxyFilePath = System.getProperty("proxy.file.path");
+ System.setProperty("connection.name", "xsede");
+ if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) {
+ System.out.println(">>>>>> Please run tests with my proxy user name and password. " +
+ "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+ throw new Exception("Need my proxy user name password to run tests.");
+ }
+
+ monitorPublisher = new MonitorPublisher(new EventBus());
+ pushQueue = new LinkedBlockingQueue<MonitorID>();
+ finishQueue = new LinkedBlockingQueue<MonitorID>();
+
+
+ final AMQPMonitor amqpMonitor = new
+ AMQPMonitor(monitorPublisher,
+ pushQueue, finishQueue,proxyFilePath,"xsede",
+ Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
+ try {
+ (new Thread(){
+ public void run(){
+ amqpMonitor.run();
+ }
+ }).start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ hostDescription = new HostDescription(GsisshHostType.type);
+ hostDescription.getType().setHostAddress("login1.stampede.tacc.utexas.edu");
+ hostDescription.getType().setHostName("stampede-host");
+ ((GsisshHostType) hostDescription.getType()).setJobManager("slurm");
+ ((GsisshHostType) hostDescription.getType()).setInstalledPath("/usr/bin/");
+ ((GsisshHostType) hostDescription.getType()).setPort(2222);
+ ((GsisshHostType) hostDescription.getType()).setMonitorMode("push");
+ }
+
+ @Test
+ public void testAMQPMonitor() throws SSHApiException {
+ /* now have to submit a job to some machine and add that job to the queue */
+ //Create authentication
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+ 7512, 17280000, certificateLocation);
+
+ // Server info
+ ServerInfo serverInfo = new ServerInfo("ogce", "login1.stampede.tacc.utexas.edu",2222);
+
+
+ Cluster pbsCluster = new
+ PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/usr/bin/"));
+
+
+ // Execute command
+ System.out.println("Target PBS file path: " + workingDirectory);
+ // constructing the job object
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ jobDescriptor.setWorkingDirectory(workingDirectory);
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+ jobDescriptor.setExecutablePath("/bin/echo");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+ jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+ jobDescriptor.setNodes(1);
+ jobDescriptor.setProcessesPerNode(1);
+ jobDescriptor.setQueueName("normal");
+ jobDescriptor.setMaxWallTime("60");
+ jobDescriptor.setAcountString("TG-STA110014S");
+ List<String> inputs = new ArrayList<String>();
+ jobDescriptor.setOwner("ogce");
+ inputs.add("Hello World");
+ jobDescriptor.setInputValues(inputs);
+ //finished construction of job object
+ System.out.println(jobDescriptor.toXML());
+ String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+ System.out.println(jobID);
+ try {
+ pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ pushThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ class InnerClassAMQP{
+ @Subscribe
+ private void getStatus(JobStatusChangeRequest status){
+ Assert.assertNotNull(status);
+ pushThread.interrupt();
+ }
+ }
+ monitorPublisher.registerListener(new InnerClassAMQP());
+// try {
+// pushThread.join(5000);
+// Iterator<MonitorID> iterator = pushQueue.iterator();
+// MonitorID next = iterator.next();
+// org.junit.Assert.assertNotNull(next.getStatus());
+// } catch (Exception e) {
+// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+// }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
new file mode 100644
index 0000000..54562ba
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class QstatMonitorTestWithMyProxyAuth {
+ private String myProxyUserName;
+ private String myProxyPassword;
+ private String certificateLocation;
+ private String pbsFilePath;
+ private String workingDirectory;
+ private HostDescription hostDescription;
+ private MonitorPublisher monitorPublisher;
+ private BlockingQueue<UserMonitorData> pullQueue;
+ private Thread monitorThread;
+
+ @org.testng.annotations.BeforeClass
+ public void setUp() throws Exception {
+ System.setProperty("myproxy.username", "ogce");
+ System.setProperty("myproxy.password", "");
+ System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh");
+ System.setProperty("gsi.working.directory", "/home/ogce");
+ System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates");
+ myProxyUserName = System.getProperty("myproxy.username");
+ myProxyPassword = System.getProperty("myproxy.password");
+ workingDirectory = System.getProperty("gsi.working.directory");
+ certificateLocation = System.getProperty("trusted.cert.location");
+ if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) {
+ System.out.println(">>>>>> Please run tests with my proxy user name and password. " +
+ "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
+ throw new Exception("Need my proxy user name password to run tests.");
+ }
+
+ monitorPublisher = new MonitorPublisher(new EventBus());
+ class InnerClassQstat {
+
+ @Subscribe
+ private void getStatus(JobStatusChangeRequest status) {
+ Assert.assertNotNull(status);
+ System.out.println(status.getState().toString());
+ monitorThread.interrupt();
+ }
+ }
+ monitorPublisher.registerListener(this);
+ pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+ final HPCPullMonitor qstatMonitor = new
+ HPCPullMonitor(pullQueue, monitorPublisher);
+ try {
+ (new Thread(){
+ public void run(){
+ qstatMonitor.run();
+ }
+ }).start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ hostDescription = new HostDescription(GsisshHostType.type);
+ hostDescription.getType().setHostAddress("trestles.sdsc.edu");
+ hostDescription.getType().setHostName("gsissh-gordon");
+ ((GsisshHostType) hostDescription.getType()).setPort(22);
+ ((GsisshHostType)hostDescription.getType()).setInstalledPath("/opt/torque/bin/");
+ }
+
+ @Test
+ public void testQstatMonitor() throws SSHApiException {
+ /* now have to submit a job to some machine and add that job to the queue */
+ //Create authentication
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org",
+ 7512, 17280000, certificateLocation);
+
+ // Server info
+ ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
+
+
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
+
+
+ // Execute command
+ System.out.println("Target PBS file path: " + workingDirectory);
+ // constructing the job object
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ jobDescriptor.setWorkingDirectory(workingDirectory);
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB");
+ jobDescriptor.setExecutablePath("/bin/echo");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out");
+ jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err");
+ jobDescriptor.setNodes(1);
+ jobDescriptor.setProcessesPerNode(1);
+ jobDescriptor.setQueueName("normal");
+ jobDescriptor.setMaxWallTime("60");
+ jobDescriptor.setAcountString("sds128");
+ List<String> inputs = new ArrayList<String>();
+ jobDescriptor.setOwner("ogce");
+ inputs.add("Hello World");
+ jobDescriptor.setInputValues(inputs);
+ //finished construction of job object
+ System.out.println(jobDescriptor.toXML());
+ for (int i = 0; i < 1; i++) {
+ String jobID = pbsCluster.submitBatchJob(jobDescriptor);
+ System.out.println("Job submitted successfully, Job ID: " + jobID);
+ MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce");
+ monitorID.setAuthenticationInfo(authenticationInfo);
+ try {
+ org.apache.airavata.gfac.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID);
+ } catch (Exception e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ try {
+
+ monitorThread.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Subscribe
+ public void testCaseShutDown(JobStatusChangeRequest status) {
+ Assert.assertNotNull(status.getState());
+ monitorThread.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/resources/PBSTemplate.xslt
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/PBSTemplate.xslt b/modules/gfac/gfac-monitor/src/test/resources/PBSTemplate.xslt
new file mode 100644
index 0000000..e749e9c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/PBSTemplate.xslt
@@ -0,0 +1,73 @@
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:ns="http://airavata.apache.org/gsi/ssh/2012/12">
+<xsl:output method="text" />
+<xsl:template match="/ns:JobDescriptor">
+#! /bin/sh
+# PBS batch job script built by Globus job manager
+# <xsl:choose>
+ <xsl:when test="ns:shellName">
+##PBS -S <xsl:value-of select="ns:shellName"/>
+ </xsl:when></xsl:choose>
+ <xsl:choose>
+ <xsl:when test="ns:queueName">
+#PBS -q <xsl:value-of select="ns:queueName"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+ <xsl:when test="ns:mailOptions">
+#PBS -m <xsl:value-of select="ns:mailOptions"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+<xsl:when test="ns:acountString">
+#PBS -A <xsl:value-of select="ns:acountString"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+ <xsl:when test="ns:maxWallTime">
+#PBS -l walltime=<xsl:value-of select="ns:maxWallTime"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+ <xsl:when test="ns:standardOutFile">
+#PBS -o <xsl:value-of select="ns:standardOutFile"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+ <xsl:when test="ns:standardOutFile">
+#PBS -e <xsl:value-of select="ns:standardErrorFile"/>
+ </xsl:when>
+ </xsl:choose>
+ <xsl:choose>
+ <xsl:when test="(ns:nodes) and (ns:processesPerNode)">
+#PBS -l nodes=<xsl:value-of select="ns:nodes"/>:ppn=<xsl:value-of select="ns:processesPerNode"/>
+<xsl:text>
</xsl:text>
+ </xsl:when>
+ </xsl:choose>
+<xsl:for-each select="ns:exports/ns:name">
+<xsl:value-of select="."/>=<xsl:value-of select="./@value"/><xsl:text>
</xsl:text>
+export<xsl:text> </xsl:text><xsl:value-of select="."/>
+<xsl:text>
</xsl:text>
+</xsl:for-each>
+<xsl:for-each select="ns:preJobCommands/ns:command">
+ <xsl:value-of select="."/><xsl:text> </xsl:text>
+ </xsl:for-each>
+cd <xsl:text> </xsl:text><xsl:value-of select="ns:workingDirectory"/><xsl:text>
</xsl:text>
+ <xsl:choose><xsl:when test="ns:jobSubmitterCommand">
+<xsl:value-of select="ns:jobSubmitterCommand"/><xsl:text> </xsl:text></xsl:when></xsl:choose><xsl:value-of select="ns:executablePath"/><xsl:text> </xsl:text>
+<xsl:for-each select="ns:inputs/ns:input">
+ <xsl:value-of select="."/><xsl:text> </xsl:text>
+ </xsl:for-each>
+<xsl:for-each select="ns:postJobCommands/ns:command">
+ <xsl:value-of select="."/><xsl:text> </xsl:text>
+</xsl:for-each>
+
+</xsl:template>
+
+</xsl:stylesheet>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/resources/echo.bat
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/echo.bat b/modules/gfac/gfac-monitor/src/test/resources/echo.bat
new file mode 100644
index 0000000..c6b849b
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/echo.bat
@@ -0,0 +1,22 @@
+::
+::
+:: Licensed to the Apache Software Foundation (ASF) under one
+:: or more contributor license agreements. See the NOTICE file
+:: distributed with this work for additional information
+:: regarding copyright ownership. The ASF licenses this file
+:: to you under the Apache License, Version 2.0 (the
+:: "License"); you may not use this file except in compliance
+:: with the License. You may obtain a copy of the License at
+::
+:: http://www.apache.org/licenses/LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing,
+:: software distributed under the License is distributed on an
+:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+:: KIND, either express or implied. See the License for the
+:: specific language governing permissions and limitations
+:: under the License.
+::
+::
+@echo off
+echo %1^=%2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/gfac-config.xml b/modules/gfac/gfac-monitor/src/test/resources/gfac-config.xml
new file mode 100644
index 0000000..da92462
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/gfac-config.xml
@@ -0,0 +1,65 @@
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more
+ contributor license agreements. See the NOTICE file ~ distributed with this
+ work for additional information ~ regarding copyright ownership. The ASF
+ licenses this file ~ to you under the Apache License, Version 2.0 (the ~
+ "License"); you may not use this file except in compliance ~ with the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, ~ software
+ distributed under the License is distributed on an ~ "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY ~ KIND, either express or implied. See the
+ License for the ~ specific language governing permissions and limitations
+ ~ under the License. -->
+
+<GFac>
+ <DaemonHandlers>
+ <Handler class="org.apache.airavata.job.TestThreadedHandler">
+ </Handler>
+ </DaemonHandlers>
+ <GlobalHandlers>
+ <InHandlers>
+ <Handler class="org.apache.airavata.job.TestGlobalHandler">
+ <property name="name" value="value"/>
+ </Handler>
+ </InHandlers>
+ <OutHandlers></OutHandlers>
+ </GlobalHandlers>
+
+ <Application name="UltraScan" class="org.apache.airavata.job.TestProvider">
+ <InHandlers>
+ <Handler class="org.apache.airavata.job.TestInHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.job.TestOutHandler"/>
+ </OutHandlers>
+ </Application>
+
+
+ <Provider class="org.apache.airavata.job.TestProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl" executionMode="async">
+ <InHandlers>
+ <Handler class="org.apache.airavata.job.TestInHandler">
+ <property name="name1" value="value1"/>
+ </Handler>
+ <!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPInputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="root"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="inputPath" value="/tmp"/>
+ <property name="passPhrase" value="/home/airavata/outputData"/>
+ <property name="password" value="/home/airavata/outputData"/> <either we have to set password or keys, password has higher preference>
+ </Handler-->
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.job.TestOutHandler"/>
+ <!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="root"/>
+ <property name="hostName" value="gw111.iu.xsede.org"/>
+ <property name="outputPath" value="/tmp"/>
+ <property name="passPhrase" value="/home/airavata/outputData"/>
+ <property name="password" value="/home/airavata/outputData"/> <either we have to set password or keys, password has higher preference>
+ </Handler-->
+ </OutHandlers>
+ </Provider>
+</GFac>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/resources/logging.properties b/modules/gfac/gfac-monitor/src/test/resources/logging.properties
new file mode 100644
index 0000000..0584d38
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/test/resources/logging.properties
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+#default/fallback log4j configuration
+#
+
+# Set root logger level to WARN and its only appender to A1.
+log4j.rootLogger=INFO, A1, A2
+
+# A1 is set to be a rolling file appender with default params
+log4j.appender.A1=org.apache.log4j.RollingFileAppender
+log4j.appender.A1.File=target/seclogs.txt
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
+
+# A2 is a console appender
+log4j.appender.A2=org.apache.log4j.ConsoleAppender
+
+# A2 uses PatternLayout.
+log4j.appender.A2.layout=org.apache.log4j.PatternLayout
+log4j.appender.A2.layout.ConversionPattern=%d [%t] %-5p %c{1} %x - %m%n
+
+log4j.logger.unicore.security=INFO
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
index 1fa6d86..13c325d 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
@@ -73,7 +73,7 @@ public class AdvancedSCPInputHandler extends AbstractHandler{
private String inputPath;
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
password = properties.get("password");
passPhrase = properties.get("passPhrase");
privateKeyPath = properties.get("privateKeyPath");
@@ -83,8 +83,10 @@ public class AdvancedSCPInputHandler extends AbstractHandler{
inputPath = properties.get("inputPath");
}
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ MessageContext inputNew = new MessageContext();
+ try{
+ if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
try {
GFACSSHUtils.addSecurityContext(jobExecutionContext);
} catch (ApplicationSettingsException e) {
@@ -92,21 +94,19 @@ public class AdvancedSCPInputHandler extends AbstractHandler{
throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
}
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
.getApplicationDeploymentDescription().getType();
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
} else {
authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
this.passPhrase);
}
- // Server info
- ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
- Cluster pbsCluster = null;
- MessageContext inputNew = new MessageContext();
- try {
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ Cluster pbsCluster = null;
// here doesn't matter what the job manager is because we are only doing some file handling
// not really dealing with monitoring or job submission, so we pa
pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
index a0d3272..dba3e69 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.List;
import java.util.Map;
/**
@@ -72,7 +71,7 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
private String outputPath;
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
password = properties.get("password");
passPhrase = properties.get("passPhrase");
privateKeyPath = properties.get("privateKeyPath");
@@ -83,31 +82,32 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
}
@Override
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
}
- }
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String standardError = app.getStandardError();
- String standardOutput = app.getStandardOutput();
- String outputDataDirectory = app.getOutputDataDirectory();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String standardError = app.getStandardError();
+ String standardOutput = app.getStandardOutput();
+ String outputDataDirectory = app.getOutputDataDirectory();
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
- AuthenticationInfo authenticationInfo = null;
- if (password != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
- this.passPhrase);
- }
- // Server info
- ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
- try {
Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+ File.separator;
@@ -120,7 +120,9 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
} catch (SSHApiException e) {
log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
log.error(e.getMessage());
- throw new GFacException(e);
+ throw new GFacHandlerException(e);
+ } catch (GFacException e) {
+ throw new GFacHandlerException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
index 70358f9..1f99a98 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHDirectorySetupHandler.java
@@ -43,21 +43,25 @@ import org.slf4j.LoggerFactory;
public class SSHDirectorySetupHandler extends AbstractHandler{
private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ } catch (GFacException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
- log.info("Setup SSH job directorties");
- super.invoke(jobExecutionContext);
- makeDirectory(jobExecutionContext);
+
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
}
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try{
Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
if (cluster == null) {
throw new GFacHandlerException("Security context is not set properly");
@@ -65,7 +69,6 @@ public class SSHDirectorySetupHandler extends AbstractHandler{
log.info("Successfully retrieved the Security Context");
}
ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- try {
String workingDirectory = app.getScratchWorkingDirectory();
cluster.makeDirectory(workingDirectory);
cluster.makeDirectory(app.getScratchWorkingDirectory());
@@ -88,7 +91,7 @@ public class SSHDirectorySetupHandler extends AbstractHandler{
detail.setTransferStatus(status);
try {
registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
} catch (Exception e1) {
throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
}
@@ -96,7 +99,7 @@ public class SSHDirectorySetupHandler extends AbstractHandler{
}
}
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
index 8da8253..5cc0ffb 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHInputHandler.java
@@ -57,23 +57,24 @@ public class SSHInputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
- if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
- try {
- GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
- }
- }
- log.info("Invoking SCPInputHandler");
- super.invoke(jobExecutionContext);
-
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
DataTransferDetails detail = new DataTransferDetails();
TransferStatus status = new TransferStatus();
-
MessageContext inputNew = new MessageContext();
try {
+
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+
MessageContext input = jobExecutionContext.getInMessageContext();
Set<String> parameters = input.getParameters().keySet();
for (String paramName : parameters) {
@@ -90,8 +91,8 @@ public class SSHInputHandler extends AbstractHandler {
status.setTransferState(TransferState.UPLOAD);
detail.setTransferStatus(status);
detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- newFiles.add(stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
}
((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
}
@@ -102,11 +103,11 @@ public class SSHInputHandler extends AbstractHandler {
status.setTransferState(TransferState.FAILED);
detail.setTransferStatus(status);
try {
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
- registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e1) {
- throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
- }
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
}
jobExecutionContext.setInMessageContext(inputNew);
@@ -134,7 +135,7 @@ public class SSHInputHandler extends AbstractHandler {
}
}
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
index 42cebba..9e1cfb2 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
public class SSHOutputHandler extends AbstractHandler{
private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context
// so attempting to get it from the registry
if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) { // this is because we don't have the right jobexecution context
@@ -89,14 +89,20 @@ public class SSHOutputHandler extends AbstractHandler{
}
}
}
- if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
- try {
+
+ try {
+ if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) {
+
GFACSSHUtils.addSecurityContext(jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage());
- throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
}
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ } catch (GFacException e1) {
+ log.error(e1.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e1, e1.getLocalizedMessage());
}
+
super.invoke(jobExecutionContext);
DataTransferDetails detail = new DataTransferDetails();
TransferStatus status = new TransferStatus();
@@ -197,7 +203,7 @@ public class SSHOutputHandler extends AbstractHandler{
status.setTransferState(TransferState.FAILED);
detail.setTransferStatus(status);
registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
- GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
} catch (Exception e1) {
throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
}
@@ -206,7 +212,7 @@ public class SSHOutputHandler extends AbstractHandler{
}
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index da48ae5..e972f0d 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -42,6 +42,7 @@ import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gfac.handler.GFacHandlerException;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.util.GFACSSHUtils;
import org.apache.airavata.gfac.utils.GFacUtils;
@@ -64,7 +65,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* Execute application using remote SSH
*/
-public class SSHProvider extends AbstractProvider{
+public class SSHProvider extends AbstractProvider {
private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
private Cluster cluster;
private String jobID = null;
@@ -92,10 +93,10 @@ public class SSHProvider extends AbstractProvider{
details.setJobID(taskID);
details.setJobDescription(remoteFile);
jobExecutionContext.setJobDetails(details);
- JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
details.setJobDescription(jobDescriptor.toXML());
- GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
log.info(remoteFile);
try {
File runscript = createShellScript(jobExecutionContext);
@@ -161,7 +162,7 @@ public class SSHProvider extends AbstractProvider{
log.info("Successfully retrieved the Security Context");
}
// This installed path is a mandetory field, because this could change based on the computing resource
- JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+ JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
log.info(jobDescriptor.toXML());
@@ -171,25 +172,25 @@ public class SSHProvider extends AbstractProvider{
jobExecutionContext.setJobDetails(jobDetails);
if (jobID == null) {
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobDetails, JobState.FAILED, taskID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
} else {
jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobDetails, JobState.SUBMITTED, taskID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
}
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobDetails, JobState.FAILED, taskID);
- GFacUtils.saveErrorDetails(error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR, taskID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobDetails, JobState.FAILED, taskID);
- GFacUtils.saveErrorDetails(error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR, taskID);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
}
} catch (GFacException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/util/GFACSSHUtils.java
index 29def62..9146bfd 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/util/GFACSSHUtils.java
@@ -22,25 +22,32 @@ package org.apache.airavata.gfac.util;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.RequestData;
import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.schemas.gfac.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Properties;
+import java.util.*;
public class GFACSSHUtils {
private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class);
@@ -84,5 +91,90 @@ public class GFACSSHUtils {
jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
}
}
+ public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext,
+ ApplicationDeploymentDescriptionType app, Cluster cluster) {
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ // this is common for any application descriptor
+ jobDescriptor.setInputDirectory(app.getInputDataDirectory());
+ jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
+ jobDescriptor.setExecutablePath(app.getExecutableLocation());
+ jobDescriptor.setStandardOutFile(app.getStandardOutput());
+ jobDescriptor.setStandardErrorFile(app.getStandardError());
+ Random random = new Random();
+ int i = random.nextInt();
+ jobDescriptor.setJobName(app.getApplicationName().getStringValue() + String.valueOf(i));
+ jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
+
+
+ List<String> inputValues = new ArrayList<String>();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString())
+ || "FileArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = null;
+ if (actualParameter.getType() instanceof URIArrayType) {
+ values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ } else if (actualParameter.getType() instanceof StringArrayType) {
+ values = ((StringArrayType) actualParameter.getType()).getValueArray();
+ } else if (actualParameter.getType() instanceof FileArrayType) {
+ values = ((FileArrayType) actualParameter.getType()).getValueArray();
+ }
+ String value = StringUtil.createDelimiteredString(values, " ");
+ inputValues.add(value);
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ inputValues.add(paramValue);
+ }
+ }
+ jobDescriptor.setInputValues(inputValues);
+
+ // this part will fill out the hpcApplicationDescriptor
+ if (app instanceof HpcApplicationDeploymentType) {
+ HpcApplicationDeploymentType applicationDeploymentType
+ = (HpcApplicationDeploymentType) app;
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setMailOptions("n");
+ jobDescriptor.setNodes(applicationDeploymentType.getNodeCount());
+ jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode());
+ jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime()));
+ jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+ if (applicationDeploymentType.getProjectAccount() != null) {
+ if (applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != null) {
+ jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber());
+ }
+ }
+ if (applicationDeploymentType.getQueue() != null) {
+ if (applicationDeploymentType.getQueue().getQueueName() != null) {
+ jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName());
+ }
+ }
+ jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ if (taskData != null && taskData.isSetTaskScheduling()) {
+ ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling();
+ if (computionnalResource.getNodeCount() > 0) {
+ jobDescriptor.setNodes(computionnalResource.getNodeCount());
+ }
+ if (computionnalResource.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
+ }
+ if (computionnalResource.getQueueName() != null) {
+ jobDescriptor.setQueueName(computionnalResource.getQueueName());
+ }
+ if (computionnalResource.getTotalCPUCount() > 0) {
+ jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
+ }
+ if (computionnalResource.getWallTimeLimit() > 0) {
+ jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
+ }
+ }
+
+ }
+ return jobDescriptor;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/pom.xml b/modules/gfac/pom.xml
index 1d6f0d1..9e1dcc0 100644
--- a/modules/gfac/pom.xml
+++ b/modules/gfac/pom.xml
@@ -38,6 +38,7 @@
<module>gfac-gram</module>
<module>gfac-gsissh</module>
<module>gfac-bes</module>
+ <module>gfac-monitor</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index c3491be..86c0c5e 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -24,9 +24,6 @@ package org.apache.airavata.orchestrator.server;
import java.util.Arrays;
import java.util.List;
-import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.MonitorManager;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
@@ -46,14 +43,10 @@ import org.slf4j.LoggerFactory;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
- private MonitorManager monitorManager = null;
-
private SimpleOrchestratorImpl orchestrator = null;
private Registry registry;
- GSIAuthenticationInfo authenticationInfo = null;
-
/**
* Query orchestrator server to fetch the CPI version
*/
@@ -90,7 +83,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
//TODO: Write the Orchestrator implementaion
try {
List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
- MonitorID monitorID = null;
if (tasks.size() > 1) {
log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
}
@@ -115,15 +107,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
return true;
}
-
- public MonitorManager getMonitorManager() {
- return monitorManager;
- }
-
- public void setMonitorManager(MonitorManager monitorManager) {
- this.monitorManager = monitorManager;
- }
-
public boolean terminateExperiment(String experimentId) throws TException {
try {
orchestrator.cancelExperiment(experimentId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index fad5e58..eb3f463 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -23,7 +23,6 @@ package org.apache.airavata.orchestrator.core.context;
import java.util.ArrayList;
import java.util.List;
-import org.apache.airavata.gfac.monitor.MonitorManager;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.registry.api.AiravataRegistry2;
@@ -41,8 +40,6 @@ public class OrchestratorContext {
private Registry newRegistry;
- private MonitorManager monitorManager;
-
public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
this.gfacInstanceList = new ArrayList<GFACInstance>();
}
@@ -81,12 +78,4 @@ public class OrchestratorContext {
public void setNewRegistry(Registry newRegistry) {
this.newRegistry = newRegistry;
}
-
- public MonitorManager getMonitorManager() {
- return monitorManager;
- }
-
- public void setMonitorManager(MonitorManager monitorManager) {
- this.monitorManager = monitorManager;
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 53b9206..2c41ef7 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -48,9 +48,6 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
gfac = new GFacImpl(orchestratorContext.getNewRegistry(), null, orchestratorContext.getRegistry());
- if (orchestratorContext.getMonitorManager()!=null) {
- orchestratorContext.getMonitorManager().registerListener(gfac);
- }
}
public GFACInstance selectGFACInstance() throws OrchestratorException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 6e5ff2f..91a6e47 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -21,49 +21,28 @@
package org.apache.airavata.orchestrator.cpi.impl;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
+
import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.gfac.monitor.AbstractActivityListener;
-import org.apache.airavata.gfac.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.MonitorManager;
-import org.apache.airavata.gfac.monitor.command.ExperimentCancelRequest;
-import org.apache.airavata.gfac.monitor.core.Monitor;
-import org.apache.airavata.gfac.monitor.core.PullMonitor;
-import org.apache.airavata.gfac.monitor.core.PushMonitor;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.impl.LocalJobMonitor;
-import org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
-import org.apache.airavata.persistance.registry.jpa.model.WorkflowNodeDetail;
import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
-public class SimpleOrchestratorImpl extends AbstractOrchestrator implements AbstractActivityListener{
+import javax.naming.OperationNotSupportedException;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator{
private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
private ExecutorService executor;
@@ -72,9 +51,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
private JobMetadataValidator jobMetadataValidator = null;
- private MonitorManager monitorManager = null;
- private AuthenticationInfo authenticationInfo = null;
public SimpleOrchestratorImpl() throws OrchestratorException {
try {
@@ -83,7 +60,6 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
jobSubmitter = aClass.newInstance();
jobSubmitter.initialize(this.orchestratorContext);
- monitorManager = new MonitorManager();
String validatorClzz = this.orchestratorContext.getOrchestratorConfiguration().getValidatorClass();
if (this.orchestratorConfiguration.isEnableValidation()) {
if (validatorClzz == null) {
@@ -106,52 +82,17 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
public String launchExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task) throws OrchestratorException {
// we give higher priority to userExperimentID
//todo support multiple validators
- String jobID = null;
String experimentId = experiment.getExperimentID();
String taskId = task.getTaskID();
- String workflowNodeId = workflowNode.getNodeInstanceId();
- String userName = experiment.getUserName();
- // creating monitorID to register with monitoring queue
- // this is a special case because amqp has to be in place before submitting the job
- HostDescription hostDescription = OrchestratorUtils.getHostDescription(this, task);
-
// creating monitorID to register with monitoring queue
// this is a special case because amqp has to be in place before submitting the job
try {
- if ((hostDescription instanceof GsisshHostType) &&
- Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
- MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId, experimentId, userName);
- monitorManager.addAJobToMonitor(monitorID);
- jobSubmitter.submit(experimentId, taskId); // even this get returns we cannot use this because subscription has to be done early
- if ("none".equals(jobID)) {
- logger.error("Job submission Failed, so we remove the job from monitoring");
-
- } else {
- logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
- }
- } else {
- // Launching job for each task
- // if the monitoring is pull mode then we add the monitorID for each task after submitting
- // the job with the jobID, otherwise we don't need the jobID
- JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
- jobExecutionContext.setTaskData(task);
- jobID = jobExecutionContext.getJobDetails().getJobID();
-
- logger.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
- MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId, experimentId, userName, authenticationInfo);
- monitorID.setJobExecutionContext(jobExecutionContext);
- if ("none".equals(jobID)) {
- logger.error("Job submission Failed, so we remove the job from monitoring");
-
- } else {
- monitorManager.addAJobToMonitor(monitorID);
- }
- }
+ JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId, taskId);
+ return jobExecutionContext.getJobDetails().getJobID();
} catch (Exception e) {
throw new OrchestratorException("Error launching the job", e);
}
- return jobID;
}
/**
@@ -182,28 +123,12 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
return tasks;
}
- @Override
public void cancelExperiment(String experimentID)
throws OrchestratorException {
- orchestratorContext.getMonitorManager().getMonitorPublisher().publish(new ExperimentCancelRequest(experimentID));
- }
-
- @Subscribe
- public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException {
- if(status.getState() == JobState.COMPLETE){
- MonitorID monitorID = status.getMonitorID();
- if(monitorID.getJobExecutionContext() == null){
- // this code is to handle amqp scenario where monitorID doesn't have
- // job execution context, in this case it will be created by the outputhandler
- String experimentID = monitorID.getExperimentID();
- String taskID = monitorID.getTaskID();
- JobExecutionContext jobExecutionContext = new JobExecutionContext(null, null);
- jobExecutionContext.setExperimentID(experimentID);
- jobExecutionContext.setTaskData(new TaskDetails(taskID));
- }
- jobSubmitter.runAfterJobTask(monitorID.getJobExecutionContext());
- }
+ throw new OrchestratorException(new OperationNotSupportedException());
}
+
+
public ExecutorService getExecutor() {
return executor;
}
@@ -228,83 +153,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements Abst
this.jobSubmitter = jobSubmitter;
}
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-
- @Override
- public void setup(Object... configurations) {
- for (Object config : configurations) {
- if (config instanceof MonitorManager){
- orchestratorContext.setMonitorManager((MonitorManager)config);
- try {
- getJobSubmitter().initialize(orchestratorContext);
- } catch (OrchestratorException e) {
- logger.error("Error reinitializing the job submitter!!!",e);
- }
- }
- }
- }
-
public void initialize() throws OrchestratorException {
- // Filling monitorManager properties
- // we can keep a single user to do all the monitoring authentication for required machine..
- try{
- String myProxyUser = ServerSettings.getSetting("myproxy.username");
- String myProxyPass = ServerSettings.getSetting("myproxy.password");
- String certPath = ServerSettings.getSetting("trusted.cert.location");
- String myProxyServer = ServerSettings.getSetting("myproxy.server");
- setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
- 7512, 17280000, certPath));
-
- // loading Monitor configuration
- String monitors = ServerSettings.getSetting("monitors");
- if(monitors == null) {
- logger.error("No Monitor is configured, so job monitoring will not monitor any job");
- return;
- }
- List<String> monitorList = Arrays.asList(monitors.split(","));
- List<String> list = Arrays.asList(ServerSettings.getSetting("amqp.hosts").split(","));
- String proxyPath = ServerSettings.getSetting("proxy.file.path");
- String connectionName = ServerSettings.getSetting("connection.name");
-
- for (String monitorClass : monitorList) {
- Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
- Monitor monitor = aClass.newInstance();
- if (monitor instanceof PullMonitor) {
- if (monitor instanceof QstatMonitor) {
- monitorManager.addQstatMonitor((QstatMonitor) monitor);
- }
- } else if (monitor instanceof PushMonitor) {
- if (monitor instanceof AMQPMonitor) {
- ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
- monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
- }
- } else if(monitor instanceof LocalJobMonitor){
- monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
- } else {
- logger.error("Wrong class is given to primary Monitor");
- }
- }
- monitorManager.registerListener(this);
- // Now Monitor Manager is properly configured, now we have to start the monitoring system.
- // This will initialize all the required threads and required queues
- monitorManager.launchMonitor();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (InstantiationException e) {
- e.printStackTrace();
- } catch (IllegalAccessException e) {
- e.printStackTrace();
- } catch (AiravataMonitorException e) {
- e.printStackTrace();
- } catch (ApplicationSettingsException e) {
- e.printStackTrace();
- }
}
}