You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/22 22:42:21 UTC
[2/4] moving ssh implementation out of gfac-core
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/context/security/SSHSecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/context/security/SSHSecurityContext.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/context/security/SSHSecurityContext.java
new file mode 100644
index 0000000..2d71b7d
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/context/security/SSHSecurityContext.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.context.security;
+
+import java.io.IOException;
+
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
+
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle SSH security
+ */
+public class SSHSecurityContext implements SecurityContext {
+ private static final Logger log = LoggerFactory.getLogger(SSHSecurityContext.class);
+
+ public static final String SSH_SECURITY_CONTEXT = "ssh";
+
+ private String username;
+ private String privateKeyLoc;
+ private String keyPass;
+ private SSHClient sshClient;
+ private Session session;
+
+ private Cluster pbsCluster;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPrivateKeyLoc() {
+ return privateKeyLoc;
+ }
+
+ public void setPrivateKeyLoc(String privateKeyLoc) {
+ this.privateKeyLoc = privateKeyLoc;
+ }
+
+ public String getKeyPass() {
+ return keyPass;
+ }
+
+ public void setKeyPass(String keyPass) {
+ this.keyPass = keyPass;
+ }
+
+ public void closeSession(Session session) {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (Exception e) {
+ log.warn("Cannot Close SSH Session");
+ }
+ }
+ }
+
+ public Session getSession(String hostAddress) throws IOException {
+ try {
+ if (sshClient == null) {
+ sshClient = new SSHClient();
+ }
+ if (getSSHClient().isConnected())
+ return getSSHClient().startSession();
+
+ KeyProvider pkey = getSSHClient().loadKeys(getPrivateKeyLoc(), getKeyPass());
+
+ getSSHClient().loadKnownHosts();
+
+ getSSHClient().connect(hostAddress);
+ getSSHClient().authPublickey(getUsername(), pkey);
+ session = getSSHClient().startSession();
+ return session;
+
+ } catch (NullPointerException ne) {
+ throw new SecurityException("Cannot load security context for SSH", ne);
+ }
+ }
+
+ public SSHClient getSSHClient() {
+ if (sshClient == null) {
+ sshClient = new SSHClient();
+ }
+ return sshClient;
+ }
+
+ public void setPbsCluster(Cluster pbsCluster) {
+ this.pbsCluster = pbsCluster;
+ }
+
+ public Cluster getPbsCluster() {
+ return this.pbsCluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..361bac7
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to computing resource
+ * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="airavata"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="inputPath" value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String inputPath;
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ password = properties.get("password");
+ passPhrase = properties.get("passPhrase");
+ privateKeyPath = properties.get("privateKeyPath");
+ publicKeyPath = properties.get("publicKeyPath");
+ userName = properties.get("userName");
+ hostName = properties.get("hostName");
+ inputPath = properties.get("inputPath");
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ Cluster pbsCluster = null;
+ MessageContext inputNew = new MessageContext();
+ try {
+ // here doesn't matter what the job manager is because we are only doing some file handling
+ // not really dealing with monitoring or job submission, so we pa
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
+ (new File(parentPath)).mkdirs();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(pbsCluster, paramValue, parentPath));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
+ try {
+ cluster.scpFrom(paramValue, parentPath);
+ return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
+ } catch (SSHApiException e) {
+ log.error("Error tranfering remote file to local file, remote path: " + paramValue);
+ throw new GFacException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..080d3c7
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be invoked
+ * Should add following configuration to gfac-config.xml and configure the keys properly
+ * <Handler class="org.apache.airavata.gfac.handler.AdvancedSCPOutputHandler">
+ <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ <property name="userName" value="airavata"/>
+ <property name="hostName" value="gw98.iu.xsede.org"/>
+ <property name="outputPath" value="/home/airavata/outputData"/>
+ <property name="passPhrase" value="/home/airavata/outputData"/>
+ <property name="password" value="/home/airavata/outputData"/>
+
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+ private String password = null;
+
+ private String publicKeyPath;
+
+ private String passPhrase;
+
+ private String privateKeyPath;
+
+ private String userName;
+
+ private String hostName;
+
+ private String outputPath;
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ password = properties.get("password");
+ passPhrase = properties.get("passPhrase");
+ privateKeyPath = properties.get("privateKeyPath");
+ publicKeyPath = properties.get("publicKeyPath");
+ userName = properties.get("userName");
+ hostName = properties.get("hostName");
+ outputPath = properties.get("outputPath");
+ }
+
+ @Override
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String standardError = app.getStandardError();
+ String standardOutput = app.getStandardOutput();
+ String outputDataDirectory = app.getOutputDataDirectory();
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ try {
+ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+ + File.separator;
+ pbsCluster.makeDirectory(outputPath);
+ pbsCluster.scpTo(outputPath, standardError);
+ pbsCluster.scpTo(outputPath,standardOutput);
+ for(String files:jobExecutionContext.getOutputFiles()){
+ pbsCluster.scpTo(outputPath,files);
+ }
+ } catch (SSHApiException e) {
+ log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
+ log.error(e.getMessage());
+ throw new GFacException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
new file mode 100644
index 0000000..a068b77
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPDirectorySetupHandler.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SCPDirectorySetupHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(SCPDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacException {
+ log.info("Setup SSH job directorties");
+ super.invoke(jobExecutionContext);
+ makeDirectory(jobExecutionContext);
+
+ }
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacException {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacHandlerException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ String workingDirectory = app.getScratchWorkingDirectory();
+ cluster.makeDirectory(workingDirectory);
+ cluster.makeDirectory(app.getScratchWorkingDirectory());
+ cluster.makeDirectory(app.getInputDataDirectory());
+ cluster.makeDirectory(app.getOutputDataDirectory());
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirectory);
+
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
+ }catch (Exception e){
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error executing the Handler: " + SCPDirectorySetupHandler.class,e);
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
new file mode 100644
index 0000000..4bf352b
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SCPInputHandler extends AbstractHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(SCPInputHandler.class);
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+
+ log.info("Invoking SCPInputHandler");
+ super.invoke(jobExecutionContext);
+
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ MessageContext inputNew = new MessageContext();
+ try {
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ try {
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
+ try {
+ String targetFile = app.getInputDataDirectory() + File.separator + substring;
+ if(paramValue.startsWith("file")){
+ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ }
+ cluster.scpTo(targetFile, paramValue);
+ return targetFile;
+ } catch (SSHApiException e) {
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
new file mode 100644
index 0000000..10287a5
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.transport.TransportException;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.util.SSHUtils;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.persistance.registry.jpa.model.DataTransferDetail;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SCPOutputHandler extends AbstractHandler{
+ private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ try {
+ Cluster cluster = null;
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+ cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+ } else {
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+ }
+ if (cluster == null) {
+ throw new GFacProviderException("Security context is not set properly");
+ } else {
+ log.info("Successfully retrieved the Security Context");
+ }
+
+ // Get the Stdouts and StdErrs
+ String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
+
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ String outputDataDir = null;
+ File localStdOutFile;
+ File localStdErrFile;
+
+ if (taskData.getAdvancedOutputDataHandling() != null) {
+ outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+ }
+ if(outputDataDir == null) {
+ outputDataDir = File.separator + "tmp";
+ }
+ outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" +jobExecutionContext.getTaskData().getTaskID();
+ (new File(outputDataDir)).mkdirs();
+
+
+ localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout");
+ localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr");
+// cluster.makeDirectory(outputDataDir);
+ cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
+ Thread.sleep(1000);
+ cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
+ Thread.sleep(1000);
+
+ String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+ String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + stdOutStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + stdErrStr);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+
+ List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
+ if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ } else {
+ String valueList = outputList.get(0);
+ cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir);
+ jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList);
+ ((URIParameterType) actualParameter.getType()).setValue(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
+ }else{
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ }
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacHandlerException(
+ "Empty Output returned from the Application, Double check the application"
+ + "and ApplicationDescriptor output Parameter Names");
+ }
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ app.setStandardError(localStdErrFile.getAbsolutePath());
+ app.setStandardOutput(localStdOutFile.getAbsolutePath());
+ app.setOutputDataDirectory(outputDataDir);
+ } catch (XmlException e) {
+ throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
+ } catch (ConnectionException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (TransportException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (IOException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error in retrieving results", e);
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
new file mode 100644
index 0000000..5914dd3
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.provider.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Execute application using remote SSH
+ */
+public class SSHProvider extends AbstractProvider implements GFacProvider{
+ private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+ private Cluster cluster;
+ private String jobID = null;
+ private String taskID = null;
+ // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
+ private GSISSHProvider gsiSshProvider = null;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ super.initialize(jobExecutionContext);
+ taskID = jobExecutionContext.getTaskData().getTaskID();
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+ cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobID(taskID);
+ details.setJobDescription(remoteFile);
+ jobExecutionContext.setJobDetails(details);
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
+ details.setJobDescription(jobDescriptor.toXML());
+
+ GFacUtils.saveJobStatus(details, JobState.SETUP, taskID);
+ log.info(remoteFile);
+ try {
+ File runscript = createShellScript(jobExecutionContext);
+ cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }else{
+ gsiSshProvider = new GSISSHProvider();
+ }
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (gsiSshProvider == null) {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ try {
+ /*
+ * Execute
+ */
+ String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ details.setJobDescription(execuable);
+
+// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
+ RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
+
+ StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+
+ CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+ String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+ log.info("stdout=" + stdOutputString);
+
+// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } finally {
+ if (cluster != null) {
+ try {
+ cluster.disconnect();
+ } catch (SSHApiException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+ } else {
+ try {
+ gsiSshProvider.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (gsiSshProvider != null){
+ try {
+ gsiSshProvider.dispose(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(),e);
+ }
+ }
+ }
+
+
+ public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+
+ private File createShellScript(JobExecutionContext context) throws IOException {
+ ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+ + new Random().nextLong();
+
+ File shellScript = File.createTempFile(uniqueDir, "sh");
+ OutputStream out = new FileOutputStream(shellScript);
+
+ out.write("#!/bin/bash\n".getBytes());
+ out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+ .getBytes());
+ // get the env of the host and the application
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ Map<String, String> nv = new HashMap<String, String>();
+ if (env != null) {
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+ }
+ for (Entry<String, String> entry : nv.entrySet()) {
+ log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+ out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+ }
+
+ // prepare the command
+ final String SPACE = " ";
+ StringBuffer cmd = new StringBuffer();
+ cmd.append(app.getExecutableLocation());
+ cmd.append(SPACE);
+
+ MessageContext input = context.getInMessageContext();
+ ;
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ cmd.append(value);
+ cmd.append(SPACE);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ cmd.append(paramValue);
+ cmd.append(SPACE);
+ }
+ }
+ // We redirect the error and stdout to remote files, they will be read
+ // in later
+ cmd.append(SPACE);
+ cmd.append("1>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardOutput());
+ cmd.append(SPACE);
+ cmd.append("2>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardError());
+
+ String cmdStr = cmd.toString();
+ log.info("Command = " + cmdStr);
+ out.write((cmdStr + "\n").getBytes());
+ String message = "\"execuationSuceeded\"";
+ out.write(("echo " + message + "\n").getBytes());
+ out.close();
+
+ return shellScript;
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+ if (gsiSshProvider != null){
+ try {
+ initProperties(properties);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(),e);
+ }
+ }
+ }
+ /**
+ * This method will read standard output and if there's any it will be parsed
+ * @param jobIDReaderCommandOutput
+ * @param errorMsg
+ * @return
+ * @throws SSHApiException
+ */
+ private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+ String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+ String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+ if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+ log.error("Standard Error output : " + stdErrorString);
+ throw new SSHApiException(errorMsg + stdErrorString);
+ }
+ return stdOutputString;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/resources/errors.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/resources/errors.properties b/modules/gfac/gfac-ssh/src/main/resources/errors.properties
new file mode 100644
index 0000000..88c41b8
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/resources/errors.properties
@@ -0,0 +1,197 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Directly copied from jglobus. Not a good way to manager error properties.
+1 = Parameter not supported
+2 = The RSL length is greater than the maximum allowed
+3 = No resources available
+4 = Bad directory specified
+5 = The executable does not exist
+6 = Insufficient funds
+7 = Authentication with the remote server failed
+8 = Job cancelled by user
+9 = Job cancelled by system
+
+10 = Data transfer to the server failed
+11 = The stdin file does not exist
+12 = The connection to the server failed (check host and port)
+13 = The provided RSL 'maxtime' value is invalid (not an integer or must be greater than 0)
+14 = The provided RSL 'count' value is invalid (not an integer or must be greater than 0)
+15 = The job manager received an invalid RSL
+16 = Could not connect to job manager
+17 = The job failed when the job manager attempted to run it
+18 = Paradyn error
+19 = The provided RSL 'jobtype' value is invalid
+
+20 = The provided RSL 'myjob' value is invalid
+21 = The job manager failed to locate an internal script argument file
+22 = The job manager failed to create an internal script argument file
+23 = The job manager detected an invalid job state
+24 = The job manager detected an invalid script response
+25 = The job manager detected an invalid job state
+26 = The provided RSL 'jobtype' value is not supported by this job manager
+27 = Unimplemented
+28 = The job manager failed to create an internal script submission file
+29 = The job manager cannot find the user proxy
+
+30 = The job manager failed to open the user proxy
+31 = The job manager failed to cancel the job as requested
+32 = System memory allocation failed
+33 = The interprocess job communication initialization failed
+34 = The interprocess job communication setup failed
+35 = The provided RSL 'host count' value is invalid
+36 = One of the provided RSL parameters is unsupported
+37 = The provided RSL 'queue' parameter is invalid
+38 = The provided RSL 'project' parameter is invalid
+39 = The provided RSL string includes variables that could not be identified
+
+40 = The provided RSL 'environment' parameter is invalid
+41 = The provided RSL 'dryrun' parameter is invalid
+42 = The provided RSL is invalid (an empty string)
+43 = The job manager failed to stage the executable
+44 = The job manager failed to stage the stdin file
+45 = The requested job manager type is invalid
+46 = The provided RSL 'arguments' parameter is invalid
+47 = The gatekeeper failed to run the job manager
+48 = The provided RSL could not be properly parsed
+49 = There is a version mismatch between GRAM components
+
+50 = The provided RSL 'arguments' parameter is invalid
+51 = The provided RSL 'count' parameter is invalid
+52 = The provided RSL 'directory' parameter is invalid
+53 = The provided RSL 'dryrun' parameter is invalid
+54 = The provided RSL 'environment' parameter is invalid
+55 = The provided RSL 'executable' parameter is invalid
+56 = The provided RSL 'host_count' parameter is invalid
+57 = The provided RSL 'jobtype' parameter is invalid
+58 = The provided RSL 'maxtime' parameter is invalid
+59 = The provided RSL 'myjob' parameter is invalid
+
+60 = The provided RSL 'paradyn' parameter is invalid
+61 = The provided RSL 'project' parameter is invalid
+62 = The provided RSL 'queue' parameter is invalid
+63 = The provided RSL 'stderr' parameter is invalid
+64 = The provided RSL 'stdin' parameter is invalid
+65 = The provided RSL 'stdout' parameter is invalid
+66 = The job manager failed to locate an internal script
+67 = The job manager failed on the system call pipe()
+68 = The job manager failed on the system call fcntl()
+69 = The job manager failed to create the temporary stdout filename
+
+70 = The job manager failed to create the temporary stderr filename
+71 = The job manager failed on the system call fork()
+72 = The executable file permissions do not allow execution
+73 = The job manager failed to open stdout
+74 = The job manager failed to open stderr
+75 = The cache file could not be opened in order to relocate the user proxy
+76 = Cannot access cache files in ~/.globus/.gass_cache, check permissions, quota, and disk space
+77 = The job manager failed to insert the contact in the client contact list
+78 = The contact was not found in the job manager's client contact list
+79 = Connecting to the job manager failed. Possible reasons: job terminated, invalid job contact, network problems, ...
+
+80 = The syntax of the job contact is invalid
+81 = The executable parameter in the RSL is undefined
+82 = The job manager service is misconfigured. condor arch undefined
+83 = The job manager service is misconfigured. condor os undefined
+84 = The provided RSL 'min_memory' parameter is invalid
+85 = The provided RSL 'max_memory' parameter is invalid
+86 = The RSL 'min_memory' value is not zero or greater
+87 = The RSL 'max_memory' value is not zero or greater
+88 = The creation of a HTTP message failed
+89 = Parsing incoming HTTP message failed
+
+90 = The packing of information into a HTTP message failed
+91 = An incoming HTTP message did not contain the expected information
+92 = The job manager does not support the service that the client requested
+93 = The gatekeeper failed to find the requested service
+94 = The jobmanager does not accept any new requests (shutting down)
+95 = The client failed to close the listener associated with the callback URL
+96 = The gatekeeper contact cannot be parsed
+97 = The job manager could not find the 'poe' command
+98 = The job manager could not find the 'mpirun' command
+99 = The provided RSL 'start_time' parameter is invalid"
+100 = The provided RSL 'reservation_handle' parameter is invalid
+
+101 = The provided RSL 'max_wall_time' parameter is invalid
+102 = The RSL 'max_wall_time' value is not zero or greater
+103 = The provided RSL 'max_cpu_time' parameter is invalid
+104 = The RSL 'max_cpu_time' value is not zero or greater
+105 = The job manager is misconfigured, a scheduler script is missing
+106 = The job manager is misconfigured, a scheduler script has invalid permissions
+107 = The job manager failed to signal the job
+108 = The job manager did not recognize/support the signal type
+109 = The job manager failed to get the job id from the local scheduler
+
+110 = The job manager is waiting for a commit signal
+111 = The job manager timed out while waiting for a commit signal
+112 = The provided RSL 'save_state' parameter is invalid
+113 = The provided RSL 'restart' parameter is invalid
+114 = The provided RSL 'two_phase' parameter is invalid
+115 = The RSL 'two_phase' value is not zero or greater
+116 = The provided RSL 'stdout_position' parameter is invalid
+117 = The RSL 'stdout_position' value is not zero or greater
+118 = The provided RSL 'stderr_position' parameter is invalid
+119 = The RSL 'stderr_position' value is not zero or greater
+
+120 = The job manager restart attempt failed
+121 = The job state file doesn't exist
+122 = Could not read the job state file
+123 = Could not write the job state file
+124 = The old job manager is still alive
+125 = The job manager state file TTL expired
+126 = It is unknown if the job was submitted
+127 = The provided RSL 'remote_io_url' parameter is invalid
+128 = Could not write the remote io url file
+129 = The standard output/error size is different
+
+130 = The job manager was sent a stop signal (job is still running)
+131 = The user proxy expired (job is still running)
+132 = The job was not submitted by original jobmanager
+133 = The job manager is not waiting for that commit signal
+134 = The provided RSL scheduler specific parameter is invalid
+135 = The job manager could not stage in a file
+136 = The scratch directory could not be created
+137 = The provided 'gass_cache' parameter is invalid
+138 = The RSL contains attributes which are not valid for job submission
+139 = The RSL contains attributes which are not valid for stdio update
+
+140 = The RSL contains attributes which are not valid for job restart
+141 = The provided RSL 'file_stage_in' parameter is invalid
+142 = The provided RSL 'file_stage_in_shared' parameter is invalid
+143 = The provided RSL 'file_stage_out' parameter is invalid
+144 = The provided RSL 'gass_cache' parameter is invalid
+145 = The provided RSL 'file_cleanup' parameter is invalid
+146 = The provided RSL 'scratch_dir' parameter is invalid
+147 = The provided scheduler-specific RSL parameter is invalid
+148 = A required RSL attribute was not defined in the RSL spec
+149 = The gass_cache attribute points to an invalid cache directory
+
+150 = The provided RSL 'save_state' parameter has an invalid value
+151 = The job manager could not open the RSL attribute validation file
+152 = The job manager could not read the RSL attribute validation file
+153 = The provided RSL 'proxy_timeout' is invalid
+154 = The RSL 'proxy_timeout' value is not greater than zero
+155 = The job manager could not stage out a file
+156 = The job contact string does not match any which the job manager is handling
+157 = Proxy delegation failed
+158 = The job manager could not lock the state lock file
+
+1000 = Failed to start up callback handler
+1003 = Job contact not set
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/main/resources/service.properties
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/resources/service.properties b/modules/gfac/gfac-ssh/src/main/resources/service.properties
new file mode 100644
index 0000000..391bfea
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/resources/service.properties
@@ -0,0 +1,58 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+
+#
+# Class which implemented Scheduler interface. It will be used to determine a Provider
+#
+scheduler.class= org.apache.airavata.core.gfac.scheduler.impl.SchedulerImpl
+
+#
+# Data Service Plugins classes
+#
+datachain.classes= org.apache.airavata.core.gfac.extension.data.RegistryDataService
+
+#
+# Pre execution Plugins classes. For example, GridFTP Input Staging
+#
+prechain.classes= org.apache.airavata.core.gfac.extension.pre.GridFtpInputStaging
+prechain.classes= org.apache.airavata.core.gfac.extension.pre.HttpInputStaging
+
+#
+# Post execution Plugins classes. For example, GridFTP Output Staging
+#
+postchain.classes= org.apache.airavata.core.gfac.extension.post.GridFtpOutputStaging
+postchain.classes= org.apache.airavata.core.gfac.extension.post.OutputRegister
+
+#
+# SSH private key location. It will be used by SSHProvider
+#
+# ssh.key=/home/user/.ssh/id_rsa
+# ssh.keypass=
+# ssh.username=usernameAtHost
+
+#
+# MyProxy credential. It will be used by GridFTP Plugins and GramProvider.
+#
+# myproxy.server=myproxy.teragrid.org
+# myproxy.user=username
+# myproxy.pass=password
+# myproxy.life=3600
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java b/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
new file mode 100644
index 0000000..8d83174
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.context.ApplicationContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.schemas.gfac.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+public class BigRed2TestWithSSHAuth {
+ private JobExecutionContext jobExecutionContext;
+
+ private String userName;
+ private String password;
+ private String passPhrase;
+ private String hostName;
+ private String workingDirectory;
+ private String privateKeyPath;
+ private String publicKeyPath;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+
+ System.out.println("Test case name " + this.getClass().getName());
+// System.setProperty("ssh.host","bigred2.uits.iu.edu"); //default ssh host
+// System.setProperty("ssh.user", "lginnali");
+// System.setProperty("ssh.private.key.path", "/Users/lahirugunathilake/.ssh/id_dsa");
+// System.setProperty("ssh.public.key.path", "/Users/lahirugunathilake/.ssh/id_dsa.pub");
+// System.setProperty("ssh.working.directory", "/tmp");
+
+ this.hostName = "bigred2.uits.iu.edu";
+ this.hostName = System.getProperty("ssh.host");
+ this.userName = System.getProperty("ssh.username");
+ this.password = System.getProperty("ssh.password");
+ this.privateKeyPath = System.getProperty("private.ssh.key");
+ this.publicKeyPath = System.getProperty("public.ssh.key");
+ this.passPhrase = System.getProperty("ssh.keypass");
+ this.workingDirectory = System.getProperty("ssh.working.directory");
+
+
+ if (this.userName == null
+ || (this.password==null && (this.publicKeyPath == null || this.privateKeyPath == null)) || this.workingDirectory == null) {
+ System.out.println("########### In order to test you have to either username password or private,public keys");
+ System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " +
+ "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory ");
+ }
+ URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ assert resource != null;
+ System.out.println(resource.getFile());
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null, null);
+
+// gFacConfiguration.setMyProxyLifeCycle(3600);
+// gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
+// gFacConfiguration.setMyProxyUser("*****");
+// gFacConfiguration.setMyProxyPassphrase("*****");
+// gFacConfiguration.setTrustedCertLocation("./certificates");
+// //have to set InFlwo Handlers and outFlowHandlers
+// gFacConfiguration.setInHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GramDirectorySetupHandler","org.apache.airavata.gfac.handler.GridFTPInputHandler"}));
+// gFacConfiguration.setOutHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GridFTPOutputHandler"}));
+
+ /*
+ * Host
+ */
+ HostDescription host = new HostDescription(SSHHostType.type);
+ host.getType().setHostAddress(hostName);
+ host.getType().setHostName(hostName);
+ ((SSHHostType)host.getType()).setHpcResource(true);
+ /*
+ * App
+ */
+ ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type);
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType();
+ ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ name.setStringValue("EchoLocal");
+ app.setApplicationName(name);
+
+ app.setCpuCount(1);
+ app.setJobType(JobTypeType.SERIAL);
+ app.setNodeCount(1);
+ app.setProcessorsPerNode(1);
+
+ /*
+ * Use bat file if it is compiled on Windows
+ */
+ app.setExecutableLocation("/bin/echo");
+
+ /*
+ * Default tmp location
+ */
+ String tempDir = "/tmp";
+ String date = (new Date()).toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+
+ tempDir = tempDir + File.separator
+ + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID();
+
+ System.out.println(tempDir);
+ app.setScratchWorkingDirectory(tempDir);
+ app.setStaticWorkingDirectory(tempDir);
+ app.setInputDataDirectory(tempDir + File.separator + "inputData");
+ app.setOutputDataDirectory(tempDir + File.separator + "outputData");
+ app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout");
+ app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr");
+ app.setMaxWallTime(5);
+ app.setJobSubmitterCommand("aprun -n 1");
+ app.setInstalledParentPath("/opt/torque/torque-4.2.3.1/bin/");
+
+ /*
+ * Service
+ */
+ ServiceDescription serv = new ServiceDescription();
+ serv.getType().setName("SimpleEcho");
+
+ List<InputParameterType> inputList = new ArrayList<InputParameterType>();
+
+ InputParameterType input = InputParameterType.Factory.newInstance();
+ input.setParameterName("echo_input");
+ input.setParameterType(StringParameterType.Factory.newInstance());
+ inputList.add(input);
+
+ InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
+
+ .size()]);
+ List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
+ OutputParameterType output = OutputParameterType.Factory.newInstance();
+ output.setParameterName("echo_output");
+ output.setParameterType(StringParameterType.Factory.newInstance());
+ outputList.add(output);
+
+ OutputParameterType[] outputParamList = outputList
+ .toArray(new OutputParameterType[outputList.size()]);
+
+ serv.getType().setInputParametersArray(inputParamList);
+ serv.getType().setOutputParametersArray(outputParamList);
+
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName());
+ // Adding security context
+ jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, getSecurityContext(app));
+ ApplicationContext applicationContext = new ApplicationContext();
+ jobExecutionContext.setApplicationContext(applicationContext);
+ applicationContext.setServiceDescription(serv);
+ applicationContext.setApplicationDeploymentDescription(appDesc);
+ applicationContext.setHostDescription(host);
+
+ MessageContext inMessage = new MessageContext();
+ ActualParameter echo_input = new ActualParameter();
+ ((StringParameterType) echo_input.getType()).setValue("echo_output=hello");
+ inMessage.addParameter("echo_input", echo_input);
+
+
+ jobExecutionContext.setInMessageContext(inMessage);
+
+ MessageContext outMessage = new MessageContext();
+ ActualParameter echo_out = new ActualParameter();
+// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ outMessage.addParameter("echo_output", echo_out);
+ jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry());
+ jobExecutionContext.setTaskData(new TaskDetails("11323"));
+ jobExecutionContext.setOutMessageContext(outMessage);
+
+ }
+
+
+ private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) {
+ try {
+
+ AuthenticationInfo authenticationInfo = null;
+ if (password != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ this.passPhrase);
+ }
+ // Server info
+ ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+
+ Cluster pbsCluster = null;
+ SSHSecurityContext sshSecurityContext = null;
+
+ JobManagerConfiguration pbsJobManager = CommonUtils.getPBSJobManager(app.getInstalledParentPath());
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, pbsJobManager);
+
+
+ sshSecurityContext = new SSHSecurityContext();
+ sshSecurityContext.setPbsCluster(pbsCluster);
+ sshSecurityContext.setUsername(userName);
+ sshSecurityContext.setKeyPass(passPhrase);
+ sshSecurityContext.setPrivateKeyLoc(privateKeyPath);
+ return sshSecurityContext;
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return null;
+ }
+
+ @Test
+ public void testSSHProvider() throws GFacException {
+ GFacImpl gFacAPI = new GFacImpl();
+ gFacAPI.submitJob(jobExecutionContext);
+ org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobDescription());
+ org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobID());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/5381d599/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java b/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
new file mode 100644
index 0000000..60b941a
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/test/java/org/apache/airavata/core/gfac/services/impl/SSHProviderTestWithSSHAuth.java
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.ApplicationContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.InputParameterType;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.commons.lang.SystemUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SSHProviderTestWithSSHAuth {
+ private JobExecutionContext jobExecutionContext;
+ @Before
+ public void setUp() throws Exception {
+
+ URL resource = SSHProviderTestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()),null,null);
+// gFacConfiguration.s
+ //have to set InFlwo Handlers and outFlowHandlers
+ ApplicationContext applicationContext = new ApplicationContext();
+ HostDescription host = new HostDescription(SSHHostType.type);
+ host.getType().setHostName("bigred");
+ host.getType().setHostAddress("bigred2.uits.iu.edu");
+ applicationContext.setHostDescription(host);
+ /*
+ * App
+ */
+ ApplicationDescription appDesc = new ApplicationDescription();
+ ApplicationDeploymentDescriptionType app = appDesc.getType();
+ ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ name.setStringValue("EchoSSH");
+ app.setApplicationName(name);
+
+ /*
+ * Use bat file if it is compiled on Windows
+ */
+ if (SystemUtils.IS_OS_WINDOWS) {
+ URL url = this.getClass().getClassLoader().getResource("echo.bat");
+ app.setExecutableLocation(url.getFile());
+ } else {
+ //for unix and Mac
+ app.setExecutableLocation("/bin/echo");
+ }
+
+ /*
+ * Job location
+ */
+ String tempDir = "/tmp";
+ String date = (new Date()).toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+
+ tempDir = tempDir + File.separator
+ + "EchoSSH" + "_" + date + "_" + UUID.randomUUID();
+
+ app.setScratchWorkingDirectory(tempDir);
+ app.setStaticWorkingDirectory(tempDir);
+ app.setInputDataDirectory(tempDir + File.separator + "input");
+ app.setOutputDataDirectory(tempDir + File.separator + "output");
+ app.setStandardOutput(tempDir + File.separator + "echo.stdout");
+ app.setStandardError(tempDir + File.separator + "echo.stderr");
+
+ applicationContext.setApplicationDeploymentDescription(appDesc);
+
+ /*
+ * Service
+ */
+ ServiceDescription serv = new ServiceDescription();
+ serv.getType().setName("EchoSSH");
+
+ List<InputParameterType> inputList = new ArrayList<InputParameterType>();
+ InputParameterType input = InputParameterType.Factory.newInstance();
+ input.setParameterName("echo_input");
+ input.setParameterType(StringParameterType.Factory.newInstance());
+ inputList.add(input);
+ InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
+ .size()]);
+
+ List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
+ OutputParameterType output = OutputParameterType.Factory.newInstance();
+ output.setParameterName("echo_output");
+ output.setParameterType(StringParameterType.Factory.newInstance());
+ outputList.add(output);
+ OutputParameterType[] outputParamList = outputList
+ .toArray(new OutputParameterType[outputList.size()]);
+
+ serv.getType().setInputParametersArray(inputParamList);
+ serv.getType().setOutputParametersArray(outputParamList);
+
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration,serv.getType().getName());
+ jobExecutionContext.setApplicationContext(applicationContext);
+
+ // Add security context
+ jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, getSecurityContext());
+ /*
+ * Host
+ */
+ applicationContext.setServiceDescription(serv);
+
+ MessageContext inMessage = new MessageContext();
+ ActualParameter echo_input = new ActualParameter();
+ ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ inMessage.addParameter("echo_input", echo_input);
+
+ jobExecutionContext.setInMessageContext(inMessage);
+
+ MessageContext outMessage = new MessageContext();
+ ActualParameter echo_out = new ActualParameter();
+// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ outMessage.addParameter("echo_output", echo_out);
+
+ jobExecutionContext.setOutMessageContext(outMessage);
+
+ }
+
+ private SSHSecurityContext getSecurityContext() {
+ SSHSecurityContext context = new SSHSecurityContext();
+ context.setUsername("lginnali");
+ context.setPrivateKeyLoc("~/.ssh/id_dsa");
+ context.setKeyPass("i want to be free");
+ return context;
+ }
+
+ @Test
+ public void testLocalProvider() throws GFacException {
+ GFacImpl gFacAPI = new GFacImpl();
+ gFacAPI.submitJob(jobExecutionContext);
+ MessageContext outMessageContext = jobExecutionContext.getOutMessageContext();
+ Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")), "hello");
+ }
+}