You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:16:33 UTC

[80/81] [abbrv] airavata git commit: Merge moduleRefactor branch

http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 0000000,b716099..3756140
mode 000000,100644..100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@@ -1,0 -1,747 +1,747 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+ package org.apache.airavata.gfac.core;
+ 
+ import org.airavata.appcatalog.cpi.AppCatalog;
+ import org.airavata.appcatalog.cpi.AppCatalogException;
+ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.AiravataZKUtils;
+ import org.apache.airavata.common.utils.DBUtil;
+ import org.apache.airavata.common.utils.MonitorPublisher;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.credential.store.store.CredentialReader;
+ import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
+ import org.apache.airavata.gfac.Constants;
+ import org.apache.airavata.gfac.ExecutionMode;
+ import org.apache.airavata.gfac.GFacConfiguration;
+ import org.apache.airavata.gfac.GFacException;
+ import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+ import org.apache.airavata.gfac.core.states.GfacExperimentState;
+ import org.apache.airavata.gfac.core.states.GfacHandlerState;
+ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+ import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+ import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+ import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+ import org.apache.airavata.model.messaging.event.JobIdentifier;
+ import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+ import org.apache.airavata.model.messaging.event.TaskIdentifier;
+ import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+ import org.apache.airavata.model.workspace.experiment.ActionableGroup;
+ import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+ import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+ import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+ import org.apache.airavata.model.workspace.experiment.Experiment;
+ import org.apache.airavata.model.workspace.experiment.ExperimentState;
+ import org.apache.airavata.model.workspace.experiment.JobDetails;
+ import org.apache.airavata.model.workspace.experiment.JobState;
+ import org.apache.airavata.model.workspace.experiment.JobStatus;
+ import org.apache.airavata.model.workspace.experiment.TaskState;
 -import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
++import org.apache.airavata.experiment.catalog.impl.RegistryFactory;
+ import org.apache.airavata.registry.cpi.ChildDataType;
+ import org.apache.airavata.registry.cpi.CompositeIdentifier;
+ import org.apache.airavata.registry.cpi.Registry;
+ import org.apache.airavata.registry.cpi.RegistryException;
+ import org.apache.airavata.registry.cpi.RegistryModelType;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.utils.ZKPaths;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.ZooDefs;
+ import org.apache.zookeeper.data.ACL;
+ import org.apache.zookeeper.data.Stat;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ import org.w3c.dom.Node;
+ import org.w3c.dom.NodeList;
+ 
+ import javax.xml.xpath.XPath;
+ import javax.xml.xpath.XPathConstants;
+ import javax.xml.xpath.XPathExpression;
+ import javax.xml.xpath.XPathExpressionException;
+ import javax.xml.xpath.XPathFactory;
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileNotFoundException;
+ import java.io.FileReader;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.InetAddress;
+ import java.net.URISyntaxException;
+ import java.net.UnknownHostException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Calendar;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ 
+ //import org.apache.airavata.commons.gfac.type.ActualParameter;
+ 
+ public class GFacUtils {
+ 	private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ 	public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ 
+ 	private GFacUtils() {
+ 	}
+ 
+ 	/**
+ 	 * Read data from inputStream and convert it to String.
+ 	 * 
+ 	 * @param in
+ 	 * @return String read from inputStream
+ 	 * @throws java.io.IOException
+ 	 */
+ 	public static String readFromStream(InputStream in) throws IOException {
+ 		try {
+ 			StringBuffer wsdlStr = new StringBuffer();
+ 
+ 			int read;
+ 
+ 			byte[] buf = new byte[1024];
+ 			while ((read = in.read(buf)) > 0) {
+ 				wsdlStr.append(new String(buf, 0, read));
+ 			}
+ 			return wsdlStr.toString();
+ 		} finally {
+ 			if (in != null) {
+ 				try {
+ 					in.close();
+ 				} catch (IOException e) {
+ 					log.warn("Cannot close InputStream: "
+ 							+ in.getClass().getName(), e);
+ 				}
+ 			}
+ 		}
+ 	}
+ 
+ 	/**
+ 	 * This returns true if the give job is finished
+ 	 * otherwise false
+ 	 *
+ 	 * @param job
+ 	 * @return
+ 	 */
+ 	public static boolean isJobFinished(JobDescriptor job) {
+ 		if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
+ 			return true;
+ 		} else {
+ 			return false;
+ 		}
+ 	}
+ 
+ 	/**
+ 	 * This will read
+ 	 *
+ 	 * @param maxWalltime
+ 	 * @return
+ 	 */
+ 	public static String maxWallTimeCalculator(int maxWalltime) {
+ 		if (maxWalltime < 60) {
+ 			return "00:" + maxWalltime + ":00";
+ 		} else {
+ 			int minutes = maxWalltime % 60;
+ 			int hours = maxWalltime / 60;
+ 			return hours + ":" + minutes + ":00";
+ 		}
+ 	}
+ 	public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
+ 		if (maxWalltime < 60) {
+ 			return "00:" + maxWalltime;
+ 		} else {
+ 			int minutes = maxWalltime % 60;
+ 			int hours = maxWalltime / 60;
+ 			return hours + ":" + minutes;
+ 		}
+ 	}
+ 	/**
+ 	 * this can be used to do framework opertaions specific to different modes
+ 	 * 
+ 	 * @param jobExecutionContext
+ 	 * @return
+ 	 */
+ 	public static boolean isSynchronousMode(
+ 			JobExecutionContext jobExecutionContext) {
+ 		GFacConfiguration gFacConfiguration = jobExecutionContext
+ 				.getGFacConfiguration();
+ 		if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
+ 				.getExecutionMode())) {
+ 			return false;
+ 		}
+ 		return true;
+ 	}
+ 
+ 	public static String readFileToString(String file)
+ 			throws FileNotFoundException, IOException {
+ 		BufferedReader instream = null;
+ 		try {
+ 
+ 			instream = new BufferedReader(new FileReader(file));
+ 			StringBuffer buff = new StringBuffer();
+ 			String temp = null;
+ 			while ((temp = instream.readLine()) != null) {
+ 				buff.append(temp);
+ 				buff.append(Constants.NEWLINE);
+ 			}
+ 			return buff.toString();
+ 		} finally {
+ 			if (instream != null) {
+ 				try {
+ 					instream.close();
+ 				} catch (IOException e) {
+ 					log.warn("Cannot close FileinputStream", e);
+ 				}
+ 			}
+ 		}
+ 	}
+ 
+ 	public static boolean isLocalHost(String appHost)
+ 			throws UnknownHostException {
+ 		String localHost = InetAddress.getLocalHost().getCanonicalHostName();
+ 		return (localHost.equals(appHost)
+ 				|| Constants.LOCALHOST.equals(appHost) || Constants._127_0_0_1
+ 					.equals(appHost));
+ 	}
+ 
+ 	public static String createUniqueNameWithDate(String name) {
+ 		String date = new Date().toString();
+ 		date = date.replaceAll(" ", "_");
+ 		date = date.replaceAll(":", "_");
+ 		return name + "_" + date;
+ 	}
+ 
+     public static List<Element> getElementList(Document doc, String expression) throws XPathExpressionException {
+         XPathFactory xPathFactory = XPathFactory.newInstance();
+         XPath xPath = xPathFactory.newXPath();
+         XPathExpression expr = xPath.compile(expression);
+         NodeList nodeList = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
+         List<Element> elementList = new ArrayList<Element>();
+         for (int i = 0; i < nodeList.getLength(); i++) {
+             Node item = nodeList.item(i);
+             if (item instanceof Element) {
+                 elementList.add((Element) item);
+             }
+         }
+         return elementList;
+     }
+ 
+ 	public static String createGsiftpURIAsString(String host, String localPath)
+ 			throws URISyntaxException {
+ 		StringBuffer buf = new StringBuffer();
+ 		if (!host.startsWith("gsiftp://"))
+ 			buf.append("gsiftp://");
+ 		buf.append(host);
+ 		if (!host.endsWith("/"))
+ 			buf.append("/");
+ 		buf.append(localPath);
+ 		return buf.toString();
+ 	}
+ 
+ 	public static void saveJobStatus(JobExecutionContext jobExecutionContext,
+                                      JobDetails details, JobState state) throws GFacException {
+ 		try {
+             // first we save job details to the registry for sa and then save the job status.
+             Registry registry = jobExecutionContext.getRegistry();
+             JobStatus status = new JobStatus();
+             status.setJobState(state);
+             details.setJobStatus(status);
+             registry.add(ChildDataType.JOB_DETAIL, details,
+                     new CompositeIdentifier(jobExecutionContext.getTaskData()
+                             .getTaskID(), details.getJobID()));
+             JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+                     jobExecutionContext.getGatewayID());
+             JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+             jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent);
+         } catch (Exception e) {
+ 			throw new GFacException("Error persisting job status"
+ 					+ e.getLocalizedMessage(), e);
+ 		}
+ 	}
+ 
+ 	public static void updateJobStatus(JobExecutionContext jobExecutionContext,
+ 			JobDetails details, JobState state) throws GFacException {
+ 		try {
+ 			Registry registry = jobExecutionContext.getRegistry();
+ 			JobStatus status = new JobStatus();
+ 			status.setJobState(state);
+ 			status.setTimeOfStateChange(Calendar.getInstance()
+ 					.getTimeInMillis());
+ 			details.setJobStatus(status);
+ 			registry.update(
+ 					org.apache.airavata.registry.cpi.RegistryModelType.JOB_DETAIL,
+ 					details, details.getJobID());
+ 		} catch (Exception e) {
+ 			throw new GFacException("Error persisting job status"
+ 					+ e.getLocalizedMessage(), e);
+ 		}
+ 	}
+ 
+ 	public static void saveErrorDetails(
+ 			JobExecutionContext jobExecutionContext, String errorMessage,
+ 			CorrectiveAction action, ErrorCategory errorCatogory)
+ 			throws GFacException {
+ 		try {
+ 			Registry registry = jobExecutionContext.getRegistry();
+ 			ErrorDetails details = new ErrorDetails();
+ 			details.setActualErrorMessage(errorMessage);
+ 			details.setCorrectiveAction(action);
+ 			details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
+ 			details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ 			details.setErrorCategory(errorCatogory);
+ 			registry.add(ChildDataType.ERROR_DETAIL, details,
+ 					jobExecutionContext.getTaskData().getTaskID());
+ 		} catch (Exception e) {
+ 			throw new GFacException("Error persisting job status"
+ 					+ e.getLocalizedMessage(), e);
+ 		}
+ 	}
+ 
+     public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
+         Map<String, Object> map = new HashMap<String, Object>();
+         for (InputDataObjectType objectType : experimentData) {
+             map.put(objectType.getName(), objectType);
+         }
+         return map;
+     }
+ 
+     public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException {
+         Map<String, Object> map = new HashMap<String, Object>();
+         for (OutputDataObjectType objectType : experimentData) {
+             map.put(objectType.getName(), objectType);
+         }
+         return map;
+     }
+ 
+ 	public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
+ 			JobExecutionContext jobExecutionContext)
+ 			throws Exception {
+ 		String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
+ 				.getExperimentID());
+         if (expState == null || expState.isEmpty()) {
+             return GfacExperimentState.UNKNOWN;
+         }
+         return GfacExperimentState.findByValue(Integer.valueOf(expState));
+     }
+ 
+ 	public static boolean createHandlerZnode(CuratorFramework curatorClient,
+                                              JobExecutionContext jobExecutionContext, String className)
+ 			throws Exception {
+ 		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+ 				jobExecutionContext.getExperimentID(), className);
+ 		Stat exists = curatorClient.checkExists().forPath(expState);
+ 		if (exists == null) {
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ 		} else {
+ 			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 			if (exists == null) {
+ 				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ 			}
+ 		}
+ 
+ 		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 		if (exists != null) {
+ 			curatorClient.setData().withVersion(exists.getVersion())
+ 					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ 							String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
+ 		}
+ 		return true;
+ 	}
+ 
+ 	public static boolean createHandlerZnode(CuratorFramework curatorClient,
+                                              JobExecutionContext jobExecutionContext, String className,
+                                              GfacHandlerState state) throws Exception {
+ 		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+ 				jobExecutionContext.getExperimentID(), className);
+ 		Stat exists = curatorClient.checkExists().forPath(expState);
+ 		if (exists == null) {
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(expState, new byte[0]);
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ 		} else {
+ 			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 			if (exists == null) {
+ 				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ 			}
+ 		}
+ 
+ 		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 		if (exists != null) {
+ 			curatorClient.setData().withVersion(exists.getVersion())
+ 					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ 							String.valueOf(state.getValue()).getBytes());
+ 		}
+ 		return true;
+ 	}
+ 
+ 	public static boolean updateHandlerState(CuratorFramework curatorClient,
+                                              JobExecutionContext jobExecutionContext, String className,
+                                              GfacHandlerState state) throws Exception {
+ 		String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+ 				jobExecutionContext.getExperimentID(), className);
+ 		Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 		if (exists != null) {
+ 			curatorClient.setData().withVersion(exists.getVersion())
+ 					.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+ 		} else {
+ 			createHandlerZnode(curatorClient, jobExecutionContext, className, state);
+ 		}
+ 		return false;
+ 	}
+ 
+ 	public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
+                                                   JobExecutionContext jobExecutionContext, String className) {
+ 		try {
+ 			String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+ 			Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ 			if (exists != null) {
+ 				String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+ 						.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+ 				return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+ 			}
+ 			return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
+ 							// return false
+ 		} catch (Exception e) {
+ 			log.error("Error occured while getting zk node status", e);
+ 			return null;
+ 		}
+ 	}
+ 
+ 	// This method is dangerous because of moving the experiment data
+ 	public static boolean createExperimentEntryForPassive(String experimentID,
+ 														  String taskID, CuratorFramework curatorClient, String experimentNode,
+ 														  String pickedChild, String tokenId, long deliveryTag) throws Exception {
+ 		String experimentPath = experimentNode + File.separator + pickedChild;
+ 		String newExperimentPath = experimentPath + File.separator + experimentID;
+ 		Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+ 		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+ 		if (oldExperimentPath == null) {  // this means this is a very new experiment
+ 			// are going to create a new node
+ 			log.info("This is a new Job, so creating all the experiment docs from the scratch");
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+             String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(newExperimentPath + File.separator + "state",
+ 							String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+ 
+ 			if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+ 				log.info("Created the node: " + stateNodePath + " successfully !");
+ 			}else {
+ 				log.error("Error creating node: " + stateNodePath + " successfully !");
+ 			}
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
+ 		} else {
+ 			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+             removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
+             if(newExperimentPath.equals(oldExperimentPath)){
+                 log.info("Re-launch experiment came to the same GFac instance");
+             }else {
+ 				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
+ 				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+ 						curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+                 copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+ 				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+ 				Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
+ 				if(exists!=null) {
+ 					curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 							.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+ 									curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+ 					ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
+ 				}
+ 				// After all the files are successfully transfered we delete the // old experiment,otherwise we do
+ 				// not delete a single file
+ 				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+ 				log.info("Deleting experiment data: " + oldExperimentPath);
+ 				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
+ 			}
+ 		}
+ 		return true;
+ 	}
+ 
+     private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
+         Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+         if (exists != null) {
+ 			ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+ 		}
+ 	}
+ 
+     private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
+         for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
+             String oldChildPath = oldPath + File.separator + childNode;
+             Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
+             String newChildPath = newPath + File.separator + childNode;
+             log.info("Creating new znode: " + newChildPath);
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+ 			if (--depth > 0) {
+                 copyChildren(curatorClient , oldChildPath, newChildPath, depth );
+             }
+         }
+     }
+ 
+ 	/**
+ 	 * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
+ 	 * through gfac-server nodes
+ 	 *
+ 	 * @param experimentID
+ 	 * @param curatorClient
+ 	 * @return
+ 	 * @throws KeeperException
+ 	 * @throws InterruptedException
+ 	 */
+ 	public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
+ 		String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ 		List<String> children = curatorClient.getChildren().forPath(experimentNode);
+ 		for (String pickedChild : children) {
+ 			String experimentPath = experimentNode + File.separator + pickedChild;
+ 			String newExpNode = experimentPath + File.separator + experimentID;
+ 			Stat exists = curatorClient.checkExists().forPath(newExpNode);
+ 			if (exists == null) {
+ 				continue;
+ 			} else {
+ 				return newExpNode;
+ 			}
+ 		}
+ 		return null;
+ 	}
+ 
+     public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
+         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
+         if (experimentEntry == null) {
+             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+                     "This happen when experiment completed and already removed from the zookeeper");
+             return false;
+         } else {
+             // check cancel operation is being processed for the same experiment.
+             Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+             if (cancelState != null) {
+                 // another cancel operation is being processed. only one cancel operation can exist for a given experiment.
+                 return false;
+             }
+ 
+ 			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ 					.forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+ 			return true;
+         }
+ 
+     }
+     public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
+ 		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+         if(experimentEntry == null){
+             return false;
+         }else {
+             Stat exists = curatorClient.checkExists().forPath(experimentEntry);
+             if (exists != null) {
+ 				String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+ 				if ("cancel".equals(operation)) {
+ 					return true;
+ 				}
+ 			}
+ 		}
+         return false;
+     }
+ 
+     public static void saveHandlerData(JobExecutionContext jobExecutionContext,
+                                        StringBuffer data, String className) throws GFacHandlerException {
+ 		try {
+ 			CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ 			if (curatorClient != null) {
+ 				String expZnodeHandlerPath = AiravataZKUtils
+ 						.getExpZnodeHandlerPath(
+ 								jobExecutionContext.getExperimentID(),
+ 								className);
+ 				Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+                 if (exists != null) {
+ 					curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+ 				} else {
+                     log.error("Saving Handler data failed, Stat is null");
+                 }
+             }
+ 		} catch (Exception e) {
+ 			throw new GFacHandlerException(e);
+ 		}
+ 	}
+ 
+ 	public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+ 		CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ 		if (curatorClient != null) {
+ 			String expZnodeHandlerPath = AiravataZKUtils
+ 					.getExpZnodeHandlerPath(
+ 							jobExecutionContext.getExperimentID(),
+ 							className);
+ 			Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ 			return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
+ 		}
+ 		return null;
+ 	}
+ 
+ 	public static CredentialReader getCredentialReader()
+ 			throws ApplicationSettingsException, IllegalAccessException,
+ 			InstantiationException {
+ 		try{
+ 		String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+ 		String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+ 		String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+ 		String driver = ServerSettings.getCredentialStoreDBDriver();
+ 		return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+ 				driver));
+ 		}catch(ClassNotFoundException e){
+ 			log.error("Not able to find driver: " + e.getLocalizedMessage());
+ 			return null;	
+ 		}
+ 	}
+ 
+     public static LOCALSubmission getLocalJobSubmission (String submissionId) throws AppCatalogException{
+         try {
+             AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+             return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+         }catch (Exception e){
+             String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+             log.error(errorMsg, e);
+             throw new AppCatalogException(errorMsg, e);
+         }
+     }
+ 
+     public static UnicoreJobSubmission getUnicoreJobSubmission (String submissionId) throws AppCatalogException{
+         try {
+             AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+             return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+         }catch (Exception e){
+             String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
+             log.error(errorMsg, e);
+             throw new AppCatalogException(errorMsg, e);
+         }
+     }
+ 
+     public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
+         try {
+             AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+             return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+         }catch (Exception e){
+             String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+             log.error(errorMsg, e);
+             throw new AppCatalogException(errorMsg, e);
+         }
+     }
+ 
+     /**
+      * To convert list to separated value
+      * @param listOfStrings
+      * @param separator
+      * @return
+      */
+     public static  String listToCsv(List<String> listOfStrings, char separator) {
+         StringBuilder sb = new StringBuilder();
+ 
+         // all but last
+         for(int i = 0; i < listOfStrings.size() - 1 ; i++) {
+             sb.append(listOfStrings.get(i));
+             sb.append(separator);
+         }
+ 
+         // last string, no separator
+         if(listOfStrings.size() > 0){
+             sb.append(listOfStrings.get(listOfStrings.size()-1));
+         }
+ 
+         return sb.toString();
+     }
+ 
+ 	public static byte[] longToBytes(long x) {
+ 		ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ 		buffer.putLong(x);
+ 		return buffer.array();
+ 	}
+ 
+ 	public static long bytesToLong(byte[] bytes) {
+ 		ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ 		buffer.put(bytes);
+ 		buffer.flip();//need flip
+ 		return buffer.getLong();
+ 	}
+ 
+     public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
+         Registry airavataRegistry = RegistryFactory.getDefaultRegistry();
+         Experiment details = (Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
+         if (details == null) {
+             details = new Experiment();
+             details.setExperimentID(experimentId);
+         }
+         org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+         status.setExperimentState(state);
+         status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+         if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) &&
+                 !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
+             status.setExperimentState(state);
+         } else {
+             status.setExperimentState(details.getExperimentStatus().getExperimentState());
+         }
+         details.setExperimentStatus(status);
+         log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
+         airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+         return details.getExperimentStatus().getExperimentState();
+     }
+ 
+     public static boolean isFailedJob (JobExecutionContext jec) {
+         JobStatus jobStatus = jec.getJobDetails().getJobStatus();
+         if (jobStatus.getJobState() == JobState.FAILED) {
+             return true;
+         }
+         return false;
+     }
+ 
+     public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception {
+         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
+         String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+         if (experimentEntry == null) {
+             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+                     "This happen when experiment completed and already removed from the CuratorFramework");
+         } else {
+             // check cancel operation is being processed for the same experiment.
+             Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
+             if (cancelState != null) {
+ 				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+ 				return true;
+ 			}
+ 		}
+         return false;
+     }
+ 
+     public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){
+         TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                 jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                 jobExecutionContext.getExperimentID(),
+                 jobExecutionContext.getGatewayID());
+         publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+     }
+ }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 0000000,307d8c3..f28b6e4
mode 000000,100644..100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@@ -1,0 -1,562 +1,562 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+ */
+ package org.apache.airavata.gfac.ssh.util;
+ 
+ import org.airavata.appcatalog.cpi.AppCatalog;
+ import org.airavata.appcatalog.cpi.AppCatalogException;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+ import org.apache.airavata.gfac.Constants;
+ import org.apache.airavata.gfac.GFacException;
+ import org.apache.airavata.gfac.RequestData;
+ import org.apache.airavata.gfac.core.JobDescriptor;
+ import org.apache.airavata.gfac.core.JobManagerConfiguration;
+ import org.apache.airavata.gfac.core.cluster.Cluster;
+ import org.apache.airavata.gfac.core.cluster.ServerInfo;
+ import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ import org.apache.airavata.gfac.core.context.MessageContext;
+ import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+ import org.apache.airavata.gfac.core.GFacUtils;
+ import org.apache.airavata.gfac.gsi.ssh.impl.GSISSHAbstractCluster;
+ import org.apache.airavata.gfac.gsi.ssh.impl.PBSCluster;
+ import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+ import org.apache.airavata.gfac.gsi.ssh.util.CommonUtils;
+ import org.apache.airavata.gfac.ssh.context.SSHAuthWrapper;
+ import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+ import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
+ import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+ import org.apache.airavata.model.appcatalog.appinterface.DataType;
+ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+ import org.apache.airavata.model.appcatalog.computeresource.*;
+ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+ import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+ import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+ import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+ import org.apache.airavata.model.workspace.experiment.TaskDetails;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.File;
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.*;
+ 
+ public class GFACSSHUtils {
+     private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class);
+ 
+     public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
+ 
+     public static final String PBS_JOB_MANAGER = "pbs";
+     public static final String SLURM_JOB_MANAGER = "slurm";
+     public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+     public static final String LSF_JOB_MANAGER = "LSF";
+ 
+     public static int maxClusterCount = 5;
+ 
+     /**
+      * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext
+      * @param jobExecutionContext
+      * @throws GFacException
+      * @throws ApplicationSettingsException
+      */
+     public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+         JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+         JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+         if (preferredJobSubmissionProtocol == JobSubmissionProtocol.GLOBUS || preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+             logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+         } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+             try {
+                 AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+                 SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+                 SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol();
 -                if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) {
++                if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS || securityProtocol == SecurityProtocol.USERNAME_PASSWORD) {
+                     SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
+                     String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+                     RequestData requestData = new RequestData(jobExecutionContext.getGatewayID());
+                     requestData.setTokenId(credentialStoreToken);
+ 
+                     ServerInfo serverInfo = new ServerInfo(null, jobExecutionContext.getHostName());
+ 
+                     Cluster pbsCluster = null;
+                     try {
+                         AuthenticationInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData);
+                         String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+                         if (installedParentPath == null) {
+                             installedParentPath = "/";
+                         }
+ 
+                         SSHCredential credentials =((TokenizedSSHAuthInfo)tokenizedSSHAuthInfo).getCredentials();// this is just a call to get and set credentials in to this object,data will be used
 -                        if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){
++                        if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null || securityProtocol == SecurityProtocol.USERNAME_PASSWORD){
+                             // now we fall back to username password authentication
+                             Properties configurationProperties = ServerSettings.getProperties();
+                             tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD));
+                         }
+                         // This should be the login user name from compute resource preference
+                         String loginUser = jobExecutionContext.getLoginUserName();
+                         if (loginUser == null) {
+                             loginUser = credentials.getPortalUserName();
+                         }
+                         serverInfo.setUserName(loginUser);
+                         jobExecutionContext.getExperiment().setUserName(loginUser);
+ 
+ 
+                         // inside the pbsCluser object
+ 
+                         String key = loginUser + jobExecutionContext.getHostName() + serverInfo.getPort();
+                         boolean recreate = false;
+                         synchronized (clusters) {
+                             if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+                                 recreate = true;
+                             } else if (clusters.containsKey(key)) {
+                                 int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+                                 if (clusters.get(key).get(i).getSession().isConnected()) {
+                                     pbsCluster = clusters.get(key).get(i);
+                                 } else {
+                                     clusters.get(key).remove(i);
+                                     recreate = true;
+                                 }
+                                 if (!recreate) {
+                                     try {
+                                         pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+                                     } catch (Exception e) {
+                                         clusters.get(key).remove(i);
+                                         logger.info("Connection found the connection map is expired, so we create from the scratch");
+                                         maxClusterCount++;
+                                         recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+                                     }
+                                 }
+                                 logger.info("Re-using the same connection used with the connection string:" + key);
+                             } else {
+                                 recreate = true;
+                             }
+                             if (recreate) {
+                             	 JobManagerConfiguration jConfig = null;
+                                  String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+                                  if (jobManager == null) {
+                                      logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+                                      jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+                                  } else {
+                                      if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                          jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+                                      } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                          jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+                                      } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                          jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+                                      } else if (LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                          jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+                                      }
+                                  }
+ 
+                                 pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,jConfig);
+                                 List<Cluster> pbsClusters = null;
+                                 if (!(clusters.containsKey(key))) {
+                                     pbsClusters = new ArrayList<Cluster>();
+                                 } else {
+                                     pbsClusters = clusters.get(key);
+                                 }
+                                 pbsClusters.add(pbsCluster);
+                                 clusters.put(key, pbsClusters);
+                             }
+                         }
+                     } catch (Exception e) {
+                         throw new GFacException("Error occurred...", e);
+                     }
+                     sshSecurityContext.setPbsCluster(pbsCluster);
+                     jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), sshSecurityContext);
+                 }
+             } catch (AppCatalogException e) {
+                 throw new GFacException("Error while getting SSH Submission object from app catalog", e);
+             }
+         }
+     }
+ 
+     /**
+      * This method can be used to add third party resource security contexts
+      * @param jobExecutionContext
+      * @param sshAuth
+      * @throws GFacException
+      * @throws ApplicationSettingsException
+      */
+     public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException {
+         try {
+             if(sshAuth== null) {
+                 throw new GFacException("Error adding security Context, because sshAuthWrapper is null");
+             }
+             SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
+             AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+             JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+             SSHJobSubmission sshJobSubmission = null;
+ 			try {
+ 				sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+ 			} catch (Exception e1) {
+ 				 logger.error("Not able to get SSHJobSubmission from registry");
+ 			}
+ 
+             Cluster pbsCluster = null;
+             String key=sshAuth.getKey();
+             boolean recreate = false;
+             synchronized (clusters) {
+                 if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+                     recreate = true;
+                 } else if (clusters.containsKey(key)) {
+                     int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+                     if (clusters.get(key).get(i).getSession().isConnected()) {
+                         pbsCluster = clusters.get(key).get(i);
+                     } else {
+                         clusters.get(key).remove(i);
+                         recreate = true;
+                     }
+                     if (!recreate) {
+                         try {
+                             pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+                         } catch (Exception e) {
+                             clusters.get(key).remove(i);
+                             logger.info("Connection found the connection map is expired, so we create from the scratch");
+                             maxClusterCount++;
+                             recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+                         }
+                     }
+                     logger.info("Re-using the same connection used with the connection string:" + key);
+                 } else {
+                     recreate = true;
+                 }
+                 if (recreate) {
+                	 JobManagerConfiguration jConfig = null;
+                	 String installedParentPath = null;
+                	 if(jobExecutionContext.getResourceJobManager()!= null){
+                		installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+                	 }
+                  if (installedParentPath == null) {
+                      installedParentPath = "/";
+                  }
+ 					if (sshJobSubmission != null) {
+ 						String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+ 						if (jobManager == null) {
+ 							logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ 							jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ 						} else {
+ 							if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ 								jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ 							} else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ 								jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ 							} else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ 								jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+ 							} else if (LSF_JOB_MANAGER.equals(jobManager)) {
+ 								jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+ 							}
+ 						}
+ 					}
+                     pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig);
+                     key = sshAuth.getKey();
+                     List<Cluster> pbsClusters = null;
+                     if (!(clusters.containsKey(key))) {
+                         pbsClusters = new ArrayList<Cluster>();
+                     } else {
+                         pbsClusters = clusters.get(key);
+                     }
+                     pbsClusters.add(pbsCluster);
+                     clusters.put(key, pbsClusters);
+                 }
+             }
+             sshSecurityContext.setPbsCluster(pbsCluster);
+             jobExecutionContext.addSecurityContext(key, sshSecurityContext);
+         } catch (Exception e) {
+             logger.error(e.getMessage(), e);
+             throw new GFacException("Error adding security Context", e);
+         }
+     }
+ 
+ 
+     public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) throws AppCatalogException, ApplicationSettingsException {
+         JobDescriptor jobDescriptor = new JobDescriptor();
+         TaskDetails taskData = jobExecutionContext.getTaskData();
+ 
+ 
+         // set email based job monitoring email  address if monitor mode is JOB_EMAIL_NOTIFICATION_MONITOR
+         boolean addJobNotifMail = isEmailBasedJobMonitor(jobExecutionContext);
+         String emailIds = null;
+         if (addJobNotifMail) {
+             emailIds = ServerSettings.getEmailBasedMonitorAddress();
+         }
+         // add all configured job notification email addresses.
+         if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+             String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+             if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+                 flags = "ALL";
+             }
+             jobDescriptor.setMailOptions(flags);
+ 
+             String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+             if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+                 if (emailIds != null && !emailIds.isEmpty()) {
+                     emailIds += ("," + userJobNotifEmailIds);
+                 } else {
+                     emailIds = userJobNotifEmailIds;
+                 }
+             }
+ 
+             if (taskData.isEnableEmailNotification()) {
+                 List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+                 String elist = GFacUtils.listToCsv(emailList, ',');
+                 if (elist != null && !elist.isEmpty()) {
+                     if (emailIds != null && !emailIds.isEmpty()) {
+                         emailIds = emailIds + "," + elist;
+                     } else {
+                         emailIds = elist;
+                     }
+                 }
+             }
+         }
+         if (emailIds != null && !emailIds.isEmpty()) {
+             logger.info("Email list: " + emailIds);
+             jobDescriptor.setMailAddress(emailIds);
+         }
+         // this is common for any application descriptor
+ 
+         jobDescriptor.setCallBackIp(ServerSettings.getIp());
+         jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
+         jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+         jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+         jobDescriptor.setExecutablePath(jobExecutionContext.getApplicationContext()
+                 .getApplicationDeploymentDescription().getExecutablePath());
+         jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+         jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
+         String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount();
+         if (computationalProjectAccount == null){
+             ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference();
+             if (computeResourcePreference != null) {
+                 computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber();
+             }
+         }
+         if (computationalProjectAccount != null) {
+             jobDescriptor.setAcountString(computationalProjectAccount);
+         }
+         // To make job name alpha numeric
+         jobDescriptor.setJobName("A" + String.valueOf(generateJobName()));
+         jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
+ 
+         List<String> inputValues = new ArrayList<String>();
+         MessageContext input = jobExecutionContext.getInMessageContext();
+ 
+         // sort the inputs first and then build the command ListR
+         Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+             @Override
+             public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                 return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+             }
+         };
+         Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+         for (Object object : input.getParameters().values()) {
+             if (object instanceof InputDataObjectType) {
+                 InputDataObjectType inputDOT = (InputDataObjectType) object;
+                 sortedInputSet.add(inputDOT);
+             }
+         }
+         for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+             if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+                 continue;
+             }
+             if (inputDataObjectType.getApplicationArgument() != null
+                     && !inputDataObjectType.getApplicationArgument().equals("")) {
+                 inputValues.add(inputDataObjectType.getApplicationArgument());
+             }
+ 
+             if (inputDataObjectType.getValue() != null
+                     && !inputDataObjectType.getValue().equals("")) {
+                 if (inputDataObjectType.getType() == DataType.URI) {
+                     // set only the relative path
+                     String filePath = inputDataObjectType.getValue();
+                     filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                     inputValues.add(filePath);
+                 }else {
+                     inputValues.add(inputDataObjectType.getValue());
+                 }
+ 
+             }
+         }
+         Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters();
+         for (Object outputParam : outputParams.values()) {
+             if (outputParam instanceof OutputDataObjectType) {
+                 OutputDataObjectType output = (OutputDataObjectType) outputParam;
+                 if (output.getApplicationArgument() != null
+                         && !output.getApplicationArgument().equals("")) {
+                     inputValues.add(output.getApplicationArgument());
+                 }
+                 if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+                     if (output.getType() == DataType.URI){
+                         String filePath = output.getValue();
+                         filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                         inputValues.add(filePath);
+                     }
+                 }
+             }
+         }
+ 
+         jobDescriptor.setInputValues(inputValues);
+         jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
+         jobDescriptor.setShellName("/bin/bash");
+         jobDescriptor.setAllEnvExport(true);
+         jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+ 
+         ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+ 
+ 
+         ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling();
+         if (taskScheduling != null) {
+             int totalNodeCount = taskScheduling.getNodeCount();
+             int totalCPUCount = taskScheduling.getTotalCPUCount();
+ 
+ 
+             if (taskScheduling.getComputationalProjectAccount() != null) {
+                 jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+             }
+             if (taskScheduling.getQueueName() != null) {
+                 jobDescriptor.setQueueName(taskScheduling.getQueueName());
+             }
+ 
+             if (totalNodeCount > 0) {
+                 jobDescriptor.setNodes(totalNodeCount);
+             }
+             if (taskScheduling.getComputationalProjectAccount() != null) {
+                 jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+             }
+             if (taskScheduling.getQueueName() != null) {
+                 jobDescriptor.setQueueName(taskScheduling.getQueueName());
+             }
+             if (totalCPUCount > 0) {
+                 int ppn = totalCPUCount / totalNodeCount;
+                 jobDescriptor.setProcessesPerNode(ppn);
+                 jobDescriptor.setCPUCount(totalCPUCount);
+             }
+             if (taskScheduling.getWallTimeLimit() > 0) {
+                 jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
+                 if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+                     jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
+                 }
+             }
+             if (taskScheduling.getTotalPhysicalMemory() > 0) {
+                 jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
+             }
+         } else {
+             logger.error("Task scheduling cannot be null at this point..");
+         }
+         ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+         List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+         if (moduleCmds != null) {
+             for (String moduleCmd : moduleCmds) {
+                 jobDescriptor.addModuleLoadCommands(moduleCmd);
+             }
+         }
+         List<String> preJobCommands = appDepDescription.getPreJobCommands();
+         if (preJobCommands != null) {
+             for (String preJobCommand : preJobCommands) {
+                 jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext));
+             }
+         }
+ 
+         List<String> postJobCommands = appDepDescription.getPostJobCommands();
+         if (postJobCommands != null) {
+             for (String postJobCommand : postJobCommands) {
+                 jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext));
+             }
+         }
+ 
+         ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+         if (parallelism != null){
+             if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+                 Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+                 if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+                     for (JobManagerCommand command : jobManagerCommands.keySet()) {
+                         if (command == JobManagerCommand.SUBMISSION) {
+                             String commandVal = jobManagerCommands.get(command);
+                             jobDescriptor.setJobSubmitter(commandVal);
+                         }
+                     }
+                 }
+             }
+         }
+         return jobDescriptor;
+     }
+ 
+     public static boolean isEmailBasedJobMonitor(JobExecutionContext jobExecutionContext) throws AppCatalogException {
+         if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+             String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+             SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+             MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+             return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+         } else {
+             return false;
+         }
+     }
+ 
+     private static int generateJobName() {
+         Random random = new Random();
+         int i = random.nextInt(Integer.MAX_VALUE);
+         i = i + 99999999;
+         if(i<0) {
+             i = i * (-1);
+         }
+         return i;
+     }
+ 
+     private static String parseCommand(String value, JobExecutionContext jobExecutionContext) {
+         String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir());
+         parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir());
+         parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir());
+         return parsedValue;
+     }
+     /**
+      * This method can be used to set the Security Context if its not set and later use it in other places
+      * @param jobExecutionContext
+      * @param authenticationInfo
+      * @param userName
+      * @param hostName
+      * @param port
+      * @return
+      * @throws GFacException
+      */
+     public static String prepareSecurityContext(JobExecutionContext jobExecutionContext, AuthenticationInfo authenticationInfo
+             , String userName, String hostName, int port) throws GFacException {
+         ServerInfo serverInfo = new ServerInfo(userName, hostName);
+         String key = userName+hostName+port;
+         SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key);
+         if (jobExecutionContext.getSecurityContext(key) == null) {
+             try {
+                 GFACSSHUtils.addSecurityContext(jobExecutionContext, sshAuthWrapper);
+             } catch (ApplicationSettingsException e) {
+                 logger.error(e.getMessage());
+                 try {
+                     StringWriter errors = new StringWriter();
+                     e.printStackTrace(new PrintWriter(errors));
+                     GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                 } catch (GFacException e1) {
+                     logger.error(e1.getLocalizedMessage());
+                 }
+                 throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+             }
+         }
+         return key;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
index 0000000,73a6e4a..38981aa
mode 000000,100644..100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
@@@ -1,0 -1,252 +1,252 @@@
+ ///*
+ // *
+ // * Licensed to the Apache Software Foundation (ASF) under one
+ // * or more contributor license agreements.  See the NOTICE file
+ // * distributed with this work for additional information
+ // * regarding copyright ownership.  The ASF licenses this file
+ // * to you under the Apache License, Version 2.0 (the
+ // * "License"); you may not use this file except in compliance
+ // * with the License.  You may obtain a copy of the License at
+ // *
+ // *   http://www.apache.org/licenses/LICENSE-2.0
+ // *
+ // * Unless required by applicable law or agreed to in writing,
+ // * software distributed under the License is distributed on an
+ // * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // * KIND, either express or implied.  See the License for the
+ // * specific language governing permissions and limitations
+ // * under the License.
+ // *
+ //*/
+ //package org.apache.airavata.core.gfac.services.impl;
+ //
+ //import org.apache.airavata.commons.gfac.type.ActualParameter;
+ //import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+ //import org.apache.airavata.commons.gfac.type.HostDescription;
+ //import org.apache.airavata.commons.gfac.type.ServiceDescription;
+ //import org.apache.airavata.gfac.GFacConfiguration;
+ //import org.apache.airavata.gfac.GFacException;
+ //import org.apache.airavata.gfac.SecurityContext;
+ //import org.apache.airavata.gfac.core.context.ApplicationContext;
+ //import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ //import org.apache.airavata.gfac.core.context.MessageContext;
+ //import org.apache.airavata.gfac.impl.BetterGfacImpl;
+ //import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+ //import org.apache.airavata.gfac.ssh.api.Cluster;
+ //import org.apache.airavata.gfac.ssh.api.SSHApiException;
+ //import org.apache.airavata.gfac.ssh.api.ServerInfo;
+ //import AuthenticationInfo;
+ //import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration;
+ //import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+ //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+ //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+ //import org.apache.airavata.gfac.ssh.util.CommonUtils;
+ //import org.apache.airavata.model.workspace.experiment.TaskDetails;
 -//import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
++//import org.apache.airavata.experiment.registry.jpa.impl.RegistryFactory;
+ //import org.apache.airavata.schemas.gfac.*;
+ //import org.testng.annotations.BeforeClass;
+ //import org.testng.annotations.Test;
+ //
+ //import java.io.File;
+ //import java.net.URL;
+ //import java.util.ArrayList;
+ //import java.util.Date;
+ //import java.util.List;
+ //import java.util.UUID;
+ //
+ //public class BigRed2TestWithSSHAuth {
+ //    private JobExecutionContext jobExecutionContext;
+ //
+ //    private String userName;
+ //    private String password;
+ //    private String passPhrase;
+ //    private String hostName;
+ //    private String workingDirectory;
+ //    private String privateKeyPath;
+ //    private String publicKeyPath;
+ //
+ //    @BeforeClass
+ //    public void setUp() throws Exception {
+ //
+ //        System.out.println("Test case name " + this.getClass().getName());
+ ////        System.setProperty("ssh.host","bigred2.uits.iu.edu");        //default ssh host
+ ////        System.setProperty("ssh.user", "lginnali");
+ ////        System.setProperty("ssh.private.key.path", "/Users/lahirugunathilake/.ssh/id_dsa");
+ ////        System.setProperty("ssh.public.key.path", "/Users/lahirugunathilake/.ssh/id_dsa.pub");
+ ////        System.setProperty("ssh.working.directory", "/tmp");
+ //
+ //        this.hostName = "bigred2.uits.iu.edu";
+ //        this.hostName = System.getProperty("ssh.host");
+ //        this.userName = System.getProperty("ssh.username");
+ //        this.password = System.getProperty("ssh.password");
+ //        this.privateKeyPath = System.getProperty("private.ssh.key");
+ //        this.publicKeyPath = System.getProperty("public.ssh.key");
+ //        this.passPhrase = System.getProperty("ssh.keypass");
+ //        this.workingDirectory = System.getProperty("ssh.working.directory");
+ //
+ //
+ //         if (this.userName == null
+ //                || (this.password==null && (this.publicKeyPath == null || this.privateKeyPath == null)) || this.workingDirectory == null) {
+ //            System.out.println("########### In order to test you have to either username password or private,public keys");
+ //            System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " +
+ //                    "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory ");
+ //        }
+ //        URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ //        assert resource != null;
+ //        System.out.println(resource.getFile());
+ //        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null);
+ //
+ ////        gFacConfiguration.setMyProxyLifeCycle(3600);
+ ////        gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
+ ////        gFacConfiguration.setMyProxyUser("*****");
+ ////        gFacConfiguration.setMyProxyPassphrase("*****");
+ ////        gFacConfiguration.setTrustedCertLocation("./certificates");
+ ////        //have to set InFlwo Handlers and outFlowHandlers
+ ////        gFacConfiguration.setInHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GramDirectorySetupHandler","org.apache.airavata.gfac.handler.GridFTPInputHandler"}));
+ ////        gFacConfiguration.setOutHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GridFTPOutputHandler"}));
+ //
+ //        /*
+ //        * Host
+ //        */
+ //        HostDescription host = new HostDescription(SSHHostType.type);
+ //        host.getType().setHostAddress(hostName);
+ //        host.getType().setHostName(hostName);
+ //        ((SSHHostType)host.getType()).setHpcResource(true);
+ //        /*
+ //        * App
+ //        */
+ //        ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type);
+ //        HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType();
+ //        ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ //        name.setStringValue("EchoLocal");
+ //        app.setApplicationName(name);
+ //
+ //        app.setCpuCount(1);
+ //        app.setJobType(JobTypeType.SERIAL);
+ //        app.setNodeCount(1);
+ //        app.setProcessorsPerNode(1);
+ //
+ //        /*
+ //        * Use bat file if it is compiled on Windows
+ //        */
+ //        app.setExecutableLocation("/bin/echo");
+ //
+ //        /*
+ //        * Default tmp location
+ //        */
+ //        String tempDir = "/tmp";
+ //        String date = (new Date()).toString();
+ //        date = date.replaceAll(" ", "_");
+ //        date = date.replaceAll(":", "_");
+ //
+ //        tempDir = tempDir + File.separator
+ //                + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID();
+ //
+ //        System.out.println(tempDir);
+ //        app.setScratchWorkingDirectory(tempDir);
+ //        app.setStaticWorkingDirectory(tempDir);
+ //        app.setInputDataDirectory(tempDir + File.separator + "inputData");
+ //        app.setOutputDataDirectory(tempDir + File.separator + "outputData");
+ //        app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout");
+ //        app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr");
+ //        app.setMaxWallTime(5);
+ //        app.setJobSubmitterCommand("aprun -n 1");
+ //        app.setInstalledParentPath("/opt/torque/torque-4.2.3.1/bin/");
+ //
+ //        /*
+ //        * Service
+ //        */
+ //        ServiceDescription serv = new ServiceDescription();
+ //        serv.getType().setName("SimpleEcho");
+ //
+ //        List<InputParameterType> inputList = new ArrayList<InputParameterType>();
+ //
+ //        InputParameterType input = InputParameterType.Factory.newInstance();
+ //        input.setParameterName("echo_input");
+ //        input.setParameterType(StringParameterType.Factory.newInstance());
+ //        inputList.add(input);
+ //
+ //        InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
+ //
+ //                .size()]);
+ //        List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
+ //        OutputParameterType output = OutputParameterType.Factory.newInstance();
+ //        output.setParameterName("echo_output");
+ //        output.setParameterType(StringParameterType.Factory.newInstance());
+ //        outputList.add(output);
+ //
+ //        OutputParameterType[] outputParamList = outputList
+ //                .toArray(new OutputParameterType[outputList.size()]);
+ //
+ //        serv.getType().setInputParametersArray(inputParamList);
+ //        serv.getType().setOutputParametersArray(outputParamList);
+ //
+ //        jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName());
+ //        // Adding security context
+ //        jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, getSecurityContext(app));
+ //        ApplicationContext applicationContext = new ApplicationContext();
+ //        jobExecutionContext.setApplicationContext(applicationContext);
+ //        applicationContext.setServiceDescription(serv);
+ //        applicationContext.setApplicationDeploymentDescription(appDesc);
+ //        applicationContext.setHostDescription(host);
+ //
+ //        MessageContext inMessage = new MessageContext();
+ //        ActualParameter echo_input = new ActualParameter();
+ //        ((StringParameterType) echo_input.getType()).setValue("echo_output=hello");
+ //        inMessage.addParameter("echo_input", echo_input);
+ //
+ //
+ //        jobExecutionContext.setInMessageContext(inMessage);
+ //
+ //        MessageContext outMessage = new MessageContext();
+ //        ActualParameter echo_out = new ActualParameter();
+ ////		((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ //        outMessage.addParameter("echo_output", echo_out);
+ //        jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry());
+ //        jobExecutionContext.setTaskData(new TaskDetails("11323"));
+ //        jobExecutionContext.setOutMessageContext(outMessage);
+ //
+ //    }
+ //
+ //
+ //    private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) {
+ //         try {
+ //
+ //        AuthenticationInfo authenticationInfo = null;
+ //        if (password != null) {
+ //            authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ //        } else {
+ //            authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ //                    this.passPhrase);
+ //        }
+ //        // Server info
+ //        ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ //
+ //        Cluster pbsCluster = null;
+ //        SSHSecurityContext sshSecurityContext = null;
+ //
+ //            JobManagerConfiguration pbsJobManager = CommonUtils.getPBSJobManager(app.getInstalledParentPath());
+ //            pbsCluster = new PBSCluster(serverInfo, authenticationInfo, pbsJobManager);
+ //
+ //
+ //            sshSecurityContext = new SSHSecurityContext();
+ //            sshSecurityContext.setPbsCluster(pbsCluster);
+ //            sshSecurityContext.setUsername(userName);
+ //            sshSecurityContext.setKeyPass(passPhrase);
+ //            sshSecurityContext.setPrivateKeyLoc(privateKeyPath);
+ //             return sshSecurityContext;
+ //        } catch (SSHApiException e) {
+ //            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+ //        }
+ //        return null;
+ //    }
+ //
+ //    @Test
+ //    public void testSSHProvider() throws GFacException {
+ //        BetterGfacImpl gFacAPI = new BetterGfacImpl();
+ //        gFacAPI.submitJob(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+ //        org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobDescription());
+ //        org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobID());
+ //    }
+ //
+ //}