You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:16:33 UTC
[80/81] [abbrv] airavata git commit: Merge moduleRefactor branch
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 0000000,b716099..3756140
mode 000000,100644..100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@@ -1,0 -1,747 +1,747 @@@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ package org.apache.airavata.gfac.core;
+
+ import org.airavata.appcatalog.cpi.AppCatalog;
+ import org.airavata.appcatalog.cpi.AppCatalogException;
+ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.AiravataZKUtils;
+ import org.apache.airavata.common.utils.DBUtil;
+ import org.apache.airavata.common.utils.MonitorPublisher;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.credential.store.store.CredentialReader;
+ import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
+ import org.apache.airavata.gfac.Constants;
+ import org.apache.airavata.gfac.ExecutionMode;
+ import org.apache.airavata.gfac.GFacConfiguration;
+ import org.apache.airavata.gfac.GFacException;
+ import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+ import org.apache.airavata.gfac.core.states.GfacExperimentState;
+ import org.apache.airavata.gfac.core.states.GfacHandlerState;
+ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+ import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+ import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+ import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+ import org.apache.airavata.model.messaging.event.JobIdentifier;
+ import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+ import org.apache.airavata.model.messaging.event.TaskIdentifier;
+ import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+ import org.apache.airavata.model.workspace.experiment.ActionableGroup;
+ import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+ import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+ import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+ import org.apache.airavata.model.workspace.experiment.Experiment;
+ import org.apache.airavata.model.workspace.experiment.ExperimentState;
+ import org.apache.airavata.model.workspace.experiment.JobDetails;
+ import org.apache.airavata.model.workspace.experiment.JobState;
+ import org.apache.airavata.model.workspace.experiment.JobStatus;
+ import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
++import org.apache.airavata.experiment.catalog.impl.RegistryFactory;
+ import org.apache.airavata.registry.cpi.ChildDataType;
+ import org.apache.airavata.registry.cpi.CompositeIdentifier;
+ import org.apache.airavata.registry.cpi.Registry;
+ import org.apache.airavata.registry.cpi.RegistryException;
+ import org.apache.airavata.registry.cpi.RegistryModelType;
+ import org.apache.curator.framework.CuratorFramework;
+ import org.apache.curator.utils.ZKPaths;
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.ZooDefs;
+ import org.apache.zookeeper.data.ACL;
+ import org.apache.zookeeper.data.Stat;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ import org.w3c.dom.Node;
+ import org.w3c.dom.NodeList;
+
+ import javax.xml.xpath.XPath;
+ import javax.xml.xpath.XPathConstants;
+ import javax.xml.xpath.XPathExpression;
+ import javax.xml.xpath.XPathExpressionException;
+ import javax.xml.xpath.XPathFactory;
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileNotFoundException;
+ import java.io.FileReader;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.InetAddress;
+ import java.net.URISyntaxException;
+ import java.net.UnknownHostException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Calendar;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+
+ //import org.apache.airavata.commons.gfac.type.ActualParameter;
+
+ public class GFacUtils {
+ private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private GFacUtils() {
+ }
+
+ /**
+ * Read data from inputStream and convert it to String.
+ *
+ * @param in
+ * @return String read from inputStream
+ * @throws java.io.IOException
+ */
+ public static String readFromStream(InputStream in) throws IOException {
+ try {
+ StringBuffer wsdlStr = new StringBuffer();
+
+ int read;
+
+ byte[] buf = new byte[1024];
+ while ((read = in.read(buf)) > 0) {
+ wsdlStr.append(new String(buf, 0, read));
+ }
+ return wsdlStr.toString();
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ log.warn("Cannot close InputStream: "
+ + in.getClass().getName(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * This returns true if the give job is finished
+ * otherwise false
+ *
+ * @param job
+ * @return
+ */
+ public static boolean isJobFinished(JobDescriptor job) {
+ if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * This will read
+ *
+ * @param maxWalltime
+ * @return
+ */
+ public static String maxWallTimeCalculator(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime + ":00";
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes + ":00";
+ }
+ }
+ public static String maxWallTimeCalculatorForLSF(int maxWalltime) {
+ if (maxWalltime < 60) {
+ return "00:" + maxWalltime;
+ } else {
+ int minutes = maxWalltime % 60;
+ int hours = maxWalltime / 60;
+ return hours + ":" + minutes;
+ }
+ }
+ /**
+ * this can be used to do framework opertaions specific to different modes
+ *
+ * @param jobExecutionContext
+ * @return
+ */
+ public static boolean isSynchronousMode(
+ JobExecutionContext jobExecutionContext) {
+ GFacConfiguration gFacConfiguration = jobExecutionContext
+ .getGFacConfiguration();
+ if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
+ .getExecutionMode())) {
+ return false;
+ }
+ return true;
+ }
+
+ public static String readFileToString(String file)
+ throws FileNotFoundException, IOException {
+ BufferedReader instream = null;
+ try {
+
+ instream = new BufferedReader(new FileReader(file));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(Constants.NEWLINE);
+ }
+ return buff.toString();
+ } finally {
+ if (instream != null) {
+ try {
+ instream.close();
+ } catch (IOException e) {
+ log.warn("Cannot close FileinputStream", e);
+ }
+ }
+ }
+ }
+
+ public static boolean isLocalHost(String appHost)
+ throws UnknownHostException {
+ String localHost = InetAddress.getLocalHost().getCanonicalHostName();
+ return (localHost.equals(appHost)
+ || Constants.LOCALHOST.equals(appHost) || Constants._127_0_0_1
+ .equals(appHost));
+ }
+
+ public static String createUniqueNameWithDate(String name) {
+ String date = new Date().toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+ return name + "_" + date;
+ }
+
+ public static List<Element> getElementList(Document doc, String expression) throws XPathExpressionException {
+ XPathFactory xPathFactory = XPathFactory.newInstance();
+ XPath xPath = xPathFactory.newXPath();
+ XPathExpression expr = xPath.compile(expression);
+ NodeList nodeList = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
+ List<Element> elementList = new ArrayList<Element>();
+ for (int i = 0; i < nodeList.getLength(); i++) {
+ Node item = nodeList.item(i);
+ if (item instanceof Element) {
+ elementList.add((Element) item);
+ }
+ }
+ return elementList;
+ }
+
+ public static String createGsiftpURIAsString(String host, String localPath)
+ throws URISyntaxException {
+ StringBuffer buf = new StringBuffer();
+ if (!host.startsWith("gsiftp://"))
+ buf.append("gsiftp://");
+ buf.append(host);
+ if (!host.endsWith("/"))
+ buf.append("/");
+ buf.append(localPath);
+ return buf.toString();
+ }
+
+ public static void saveJobStatus(JobExecutionContext jobExecutionContext,
+ JobDetails details, JobState state) throws GFacException {
+ try {
+ // first we save job details to the registry for sa and then save the job status.
+ Registry registry = jobExecutionContext.getRegistry();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ details.setJobStatus(status);
+ registry.add(ChildDataType.JOB_DETAIL, details,
+ new CompositeIdentifier(jobExecutionContext.getTaskData()
+ .getTaskID(), details.getJobID()));
+ JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+ jobExecutionContext.getMonitorPublisher().publish(jobStatusChangeRequestEvent);
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void updateJobStatus(JobExecutionContext jobExecutionContext,
+ JobDetails details, JobState state) throws GFacException {
+ try {
+ Registry registry = jobExecutionContext.getRegistry();
+ JobStatus status = new JobStatus();
+ status.setJobState(state);
+ status.setTimeOfStateChange(Calendar.getInstance()
+ .getTimeInMillis());
+ details.setJobStatus(status);
+ registry.update(
+ org.apache.airavata.registry.cpi.RegistryModelType.JOB_DETAIL,
+ details, details.getJobID());
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void saveErrorDetails(
+ JobExecutionContext jobExecutionContext, String errorMessage,
+ CorrectiveAction action, ErrorCategory errorCatogory)
+ throws GFacException {
+ try {
+ Registry registry = jobExecutionContext.getRegistry();
+ ErrorDetails details = new ErrorDetails();
+ details.setActualErrorMessage(errorMessage);
+ details.setCorrectiveAction(action);
+ details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
+ details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+ details.setErrorCategory(errorCatogory);
+ registry.add(ChildDataType.ERROR_DETAIL, details,
+ jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ throw new GFacException("Error persisting job status"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ for (InputDataObjectType objectType : experimentData) {
+ map.put(objectType.getName(), objectType);
+ }
+ return map;
+ }
+
+ public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException {
+ Map<String, Object> map = new HashMap<String, Object>();
+ for (OutputDataObjectType objectType : experimentData) {
+ map.put(objectType.getName(), objectType);
+ }
+ return map;
+ }
+
+ public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
+ JobExecutionContext jobExecutionContext)
+ throws Exception {
+ String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
+ .getExperimentID());
+ if (expState == null || expState.isEmpty()) {
+ return GfacExperimentState.UNKNOWN;
+ }
+ return GfacExperimentState.findByValue(Integer.valueOf(expState));
+ }
+
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
+ JobExecutionContext jobExecutionContext, String className)
+ throws Exception {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(expState);
+ if (exists == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ } else {
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ }
+ }
+
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
+ }
+ return true;
+ }
+
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
+ JobExecutionContext jobExecutionContext, String className,
+ GfacHandlerState state) throws Exception {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(expState);
+ if (exists == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ } else {
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+ }
+ }
+
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(state.getValue()).getBytes());
+ }
+ return true;
+ }
+
+ public static boolean updateHandlerState(CuratorFramework curatorClient,
+ JobExecutionContext jobExecutionContext, String className,
+ GfacHandlerState state) throws Exception {
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+ } else {
+ createHandlerZnode(curatorClient, jobExecutionContext, className, state);
+ }
+ return false;
+ }
+
+ public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
+ JobExecutionContext jobExecutionContext, String className) {
+ try {
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+ return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+ }
+ return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
+ // return false
+ } catch (Exception e) {
+ log.error("Error occured while getting zk node status", e);
+ return null;
+ }
+ }
+
+ // This method is dangerous because of moving the experiment data
+ public static boolean createExperimentEntryForPassive(String experimentID,
+ String taskID, CuratorFramework curatorClient, String experimentNode,
+ String pickedChild, String tokenId, long deliveryTag) throws Exception {
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String newExperimentPath = experimentPath + File.separator + experimentID;
+ Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+ String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+ if (oldExperimentPath == null) { // this means this is a very new experiment
+ // are going to create a new node
+ log.info("This is a new Job, so creating all the experiment docs from the scratch");
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+ String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + File.separator + "state",
+ String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+
+ if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+ log.info("Created the node: " + stateNodePath + " successfully !");
+ }else {
+ log.error("Error creating node: " + stateNodePath + " successfully !");
+ }
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
+ } else {
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+ removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
+ if(newExperimentPath.equals(oldExperimentPath)){
+ log.info("Re-launch experiment came to the same GFac instance");
+ }else {
+ log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+ curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+ copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+ String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+ Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
+ if(exists!=null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+ curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
+ }
+ // After all the files are successfully transfered we delete the // old experiment,otherwise we do
+ // not delete a single file
+ log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+ log.info("Deleting experiment data: " + oldExperimentPath);
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
+ }
+ }
+ return true;
+ }
+
+ private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
+ Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+ if (exists != null) {
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+ }
+ }
+
+ private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
+ for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
+ String oldChildPath = oldPath + File.separator + childNode;
+ Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
+ String newChildPath = newPath + File.separator + childNode;
+ log.info("Creating new znode: " + newChildPath);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+ if (--depth > 0) {
+ copyChildren(curatorClient , oldChildPath, newChildPath, depth );
+ }
+ }
+ }
+
+ /**
+ * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
+ * through gfac-server nodes
+ *
+ * @param experimentID
+ * @param curatorClient
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
+ String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ List<String> children = curatorClient.getChildren().forPath(experimentNode);
+ for (String pickedChild : children) {
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String newExpNode = experimentPath + File.separator + experimentID;
+ Stat exists = curatorClient.checkExists().forPath(newExpNode);
+ if (exists == null) {
+ continue;
+ } else {
+ return newExpNode;
+ }
+ }
+ return null;
+ }
+
+ public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
+ if (experimentEntry == null) {
+ // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+ log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+ "This happen when experiment completed and already removed from the zookeeper");
+ return false;
+ } else {
+ // check cancel operation is being processed for the same experiment.
+ Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+ if (cancelState != null) {
+ // another cancel operation is being processed. only one cancel operation can exist for a given experiment.
+ return false;
+ }
+
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+ return true;
+ }
+
+ }
+ public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+ if(experimentEntry == null){
+ return false;
+ }else {
+ Stat exists = curatorClient.checkExists().forPath(experimentEntry);
+ if (exists != null) {
+ String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+ if ("cancel".equals(operation)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static void saveHandlerData(JobExecutionContext jobExecutionContext,
+ StringBuffer data, String className) throws GFacHandlerException {
+ try {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
+ String expZnodeHandlerPath = AiravataZKUtils
+ .getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(),
+ className);
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+ } else {
+ log.error("Saving Handler data failed, Stat is null");
+ }
+ }
+ } catch (Exception e) {
+ throw new GFacHandlerException(e);
+ }
+ }
+
+ public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
+ String expZnodeHandlerPath = AiravataZKUtils
+ .getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(),
+ className);
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
+ }
+ return null;
+ }
+
+ public static CredentialReader getCredentialReader()
+ throws ApplicationSettingsException, IllegalAccessException,
+ InstantiationException {
+ try{
+ String jdbcUrl = ServerSettings.getCredentialStoreDBURL();
+ String jdbcUsr = ServerSettings.getCredentialStoreDBUser();
+ String jdbcPass = ServerSettings.getCredentialStoreDBPassword();
+ String driver = ServerSettings.getCredentialStoreDBDriver();
+ return new CredentialReaderImpl(new DBUtil(jdbcUrl, jdbcUsr, jdbcPass,
+ driver));
+ }catch(ClassNotFoundException e){
+ log.error("Not able to find driver: " + e.getLocalizedMessage());
+ return null;
+ }
+ }
+
+ public static LOCALSubmission getLocalJobSubmission (String submissionId) throws AppCatalogException{
+ try {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ return appCatalog.getComputeResource().getLocalJobSubmission(submissionId);
+ }catch (Exception e){
+ String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
+ log.error(errorMsg, e);
+ throw new AppCatalogException(errorMsg, e);
+ }
+ }
+
+ public static UnicoreJobSubmission getUnicoreJobSubmission (String submissionId) throws AppCatalogException{
+ try {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ return appCatalog.getComputeResource().getUNICOREJobSubmission(submissionId);
+ }catch (Exception e){
+ String errorMsg = "Error while retrieving UNICORE job submission with submission id : " + submissionId;
+ log.error(errorMsg, e);
+ throw new AppCatalogException(errorMsg, e);
+ }
+ }
+
+ public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
+ try {
+ AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
+ return appCatalog.getComputeResource().getSSHJobSubmission(submissionId);
+ }catch (Exception e){
+ String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
+ log.error(errorMsg, e);
+ throw new AppCatalogException(errorMsg, e);
+ }
+ }
+
+ /**
+ * To convert list to separated value
+ * @param listOfStrings
+ * @param separator
+ * @return
+ */
+ public static String listToCsv(List<String> listOfStrings, char separator) {
+ StringBuilder sb = new StringBuilder();
+
+ // all but last
+ for(int i = 0; i < listOfStrings.size() - 1 ; i++) {
+ sb.append(listOfStrings.get(i));
+ sb.append(separator);
+ }
+
+ // last string, no separator
+ if(listOfStrings.size() > 0){
+ sb.append(listOfStrings.get(listOfStrings.size()-1));
+ }
+
+ return sb.toString();
+ }
+
+ public static byte[] longToBytes(long x) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(x);
+ return buffer.array();
+ }
+
+ public static long bytesToLong(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.put(bytes);
+ buffer.flip();//need flip
+ return buffer.getLong();
+ }
+
+ public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
+ Registry airavataRegistry = RegistryFactory.getDefaultRegistry();
+ Experiment details = (Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
+ if (details == null) {
+ details = new Experiment();
+ details.setExperimentID(experimentId);
+ }
+ org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+ status.setExperimentState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) &&
+ !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
+ status.setExperimentState(state);
+ } else {
+ status.setExperimentState(details.getExperimentStatus().getExperimentState());
+ }
+ details.setExperimentStatus(status);
+ log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
+ airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+ return details.getExperimentStatus().getExperimentState();
+ }
+
+ public static boolean isFailedJob (JobExecutionContext jec) {
+ JobStatus jobStatus = jec.getJobDetails().getJobStatus();
+ if (jobStatus.getJobState() == JobState.FAILED) {
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
+ String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+ if (experimentEntry == null) {
+ // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
+ log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
+ "This happen when experiment completed and already removed from the CuratorFramework");
+ } else {
+ // check cancel operation is being processed for the same experiment.
+ Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
+ if (cancelState != null) {
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static void publishTaskStatus (JobExecutionContext jobExecutionContext, MonitorPublisher publisher, TaskState state){
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+ }
+ }
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 0000000,307d8c3..f28b6e4
mode 000000,100644..100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@@ -1,0 -1,562 +1,562 @@@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ package org.apache.airavata.gfac.ssh.util;
+
+ import org.airavata.appcatalog.cpi.AppCatalog;
+ import org.airavata.appcatalog.cpi.AppCatalogException;
+ import org.apache.airavata.common.exception.ApplicationSettingsException;
+ import org.apache.airavata.common.utils.ServerSettings;
+ import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
+ import org.apache.airavata.gfac.Constants;
+ import org.apache.airavata.gfac.GFacException;
+ import org.apache.airavata.gfac.RequestData;
+ import org.apache.airavata.gfac.core.JobDescriptor;
+ import org.apache.airavata.gfac.core.JobManagerConfiguration;
+ import org.apache.airavata.gfac.core.cluster.Cluster;
+ import org.apache.airavata.gfac.core.cluster.ServerInfo;
+ import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ import org.apache.airavata.gfac.core.context.MessageContext;
+ import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+ import org.apache.airavata.gfac.core.GFacUtils;
+ import org.apache.airavata.gfac.gsi.ssh.impl.GSISSHAbstractCluster;
+ import org.apache.airavata.gfac.gsi.ssh.impl.PBSCluster;
+ import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+ import org.apache.airavata.gfac.gsi.ssh.util.CommonUtils;
+ import org.apache.airavata.gfac.ssh.context.SSHAuthWrapper;
+ import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+ import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo;
+ import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+ import org.apache.airavata.model.appcatalog.appinterface.DataType;
+ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+ import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+ import org.apache.airavata.model.appcatalog.computeresource.*;
+ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+ import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+ import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+ import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+ import org.apache.airavata.model.workspace.experiment.TaskDetails;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.io.File;
+ import java.io.PrintWriter;
+ import java.io.StringWriter;
+ import java.util.*;
+
+ public class GFACSSHUtils {
+ private final static Logger logger = LoggerFactory.getLogger(GFACSSHUtils.class);
+
+ public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
+
+ public static final String PBS_JOB_MANAGER = "pbs";
+ public static final String SLURM_JOB_MANAGER = "slurm";
+ public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+ public static final String LSF_JOB_MANAGER = "LSF";
+
+ public static int maxClusterCount = 5;
+
+ /**
+ * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext
+ * @param jobExecutionContext
+ * @throws GFacException
+ * @throws ApplicationSettingsException
+ */
+ public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+ JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ if (preferredJobSubmissionProtocol == JobSubmissionProtocol.GLOBUS || preferredJobSubmissionProtocol == JobSubmissionProtocol.UNICORE) {
+ logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+ } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+ try {
+ AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+ SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+ SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol();
- if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) {
++ if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS || securityProtocol == SecurityProtocol.USERNAME_PASSWORD) {
+ SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
+ String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+ RequestData requestData = new RequestData(jobExecutionContext.getGatewayID());
+ requestData.setTokenId(credentialStoreToken);
+
+ ServerInfo serverInfo = new ServerInfo(null, jobExecutionContext.getHostName());
+
+ Cluster pbsCluster = null;
+ try {
+ AuthenticationInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData);
+ String installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+ if (installedParentPath == null) {
+ installedParentPath = "/";
+ }
+
+ SSHCredential credentials =((TokenizedSSHAuthInfo)tokenizedSSHAuthInfo).getCredentials();// this is just a call to get and set credentials in to this object,data will be used
- if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){
++ if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null || securityProtocol == SecurityProtocol.USERNAME_PASSWORD){
+ // now we fall back to username password authentication
+ Properties configurationProperties = ServerSettings.getProperties();
+ tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD));
+ }
+ // This should be the login user name from compute resource preference
+ String loginUser = jobExecutionContext.getLoginUserName();
+ if (loginUser == null) {
+ loginUser = credentials.getPortalUserName();
+ }
+ serverInfo.setUserName(loginUser);
+ jobExecutionContext.getExperiment().setUserName(loginUser);
+
+
+ // inside the pbsCluser object
+
+ String key = loginUser + jobExecutionContext.getHostName() + serverInfo.getPort();
+ boolean recreate = false;
+ synchronized (clusters) {
+ if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+ recreate = true;
+ } else if (clusters.containsKey(key)) {
+ int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+ if (clusters.get(key).get(i).getSession().isConnected()) {
+ pbsCluster = clusters.get(key).get(i);
+ } else {
+ clusters.get(key).remove(i);
+ recreate = true;
+ }
+ if (!recreate) {
+ try {
+ pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+ } catch (Exception e) {
+ clusters.get(key).remove(i);
+ logger.info("Connection found the connection map is expired, so we create from the scratch");
+ maxClusterCount++;
+ recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+ }
+ }
+ logger.info("Re-using the same connection used with the connection string:" + key);
+ } else {
+ recreate = true;
+ }
+ if (recreate) {
+ JobManagerConfiguration jConfig = null;
+ String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+ if (jobManager == null) {
+ logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else {
+ if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+ } else if (LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+ }
+ }
+
+ pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo,jConfig);
+ List<Cluster> pbsClusters = null;
+ if (!(clusters.containsKey(key))) {
+ pbsClusters = new ArrayList<Cluster>();
+ } else {
+ pbsClusters = clusters.get(key);
+ }
+ pbsClusters.add(pbsCluster);
+ clusters.put(key, pbsClusters);
+ }
+ }
+ } catch (Exception e) {
+ throw new GFacException("Error occurred...", e);
+ }
+ sshSecurityContext.setPbsCluster(pbsCluster);
+ jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), sshSecurityContext);
+ }
+ } catch (AppCatalogException e) {
+ throw new GFacException("Error while getting SSH Submission object from app catalog", e);
+ }
+ }
+ }
+
+ /**
+ * This method can be used to add third party resource security contexts
+ * @param jobExecutionContext
+ * @param sshAuth
+ * @throws GFacException
+ * @throws ApplicationSettingsException
+ */
+ public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException {
+ try {
+ if(sshAuth== null) {
+ throw new GFacException("Error adding security Context, because sshAuthWrapper is null");
+ }
+ SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
+ AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+ JobSubmissionInterface preferredJobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+ SSHJobSubmission sshJobSubmission = null;
+ try {
+ sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId());
+ } catch (Exception e1) {
+ logger.error("Not able to get SSHJobSubmission from registry");
+ }
+
+ Cluster pbsCluster = null;
+ String key=sshAuth.getKey();
+ boolean recreate = false;
+ synchronized (clusters) {
+ if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+ recreate = true;
+ } else if (clusters.containsKey(key)) {
+ int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+ if (clusters.get(key).get(i).getSession().isConnected()) {
+ pbsCluster = clusters.get(key).get(i);
+ } else {
+ clusters.get(key).remove(i);
+ recreate = true;
+ }
+ if (!recreate) {
+ try {
+ pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+ } catch (Exception e) {
+ clusters.get(key).remove(i);
+ logger.info("Connection found the connection map is expired, so we create from the scratch");
+ maxClusterCount++;
+ recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+ }
+ }
+ logger.info("Re-using the same connection used with the connection string:" + key);
+ } else {
+ recreate = true;
+ }
+ if (recreate) {
+ JobManagerConfiguration jConfig = null;
+ String installedParentPath = null;
+ if(jobExecutionContext.getResourceJobManager()!= null){
+ installedParentPath = jobExecutionContext.getResourceJobManager().getJobManagerBinPath();
+ }
+ if (installedParentPath == null) {
+ installedParentPath = "/";
+ }
+ if (sshJobSubmission != null) {
+ String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+ if (jobManager == null) {
+ logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else {
+ if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+ } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+ } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+ jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+ } else if (LSF_JOB_MANAGER.equals(jobManager)) {
+ jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+ }
+ }
+ }
+ pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),jConfig);
+ key = sshAuth.getKey();
+ List<Cluster> pbsClusters = null;
+ if (!(clusters.containsKey(key))) {
+ pbsClusters = new ArrayList<Cluster>();
+ } else {
+ pbsClusters = clusters.get(key);
+ }
+ pbsClusters.add(pbsCluster);
+ clusters.put(key, pbsClusters);
+ }
+ }
+ sshSecurityContext.setPbsCluster(pbsCluster);
+ jobExecutionContext.addSecurityContext(key, sshSecurityContext);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new GFacException("Error adding security Context", e);
+ }
+ }
+
+
+ public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) throws AppCatalogException, ApplicationSettingsException {
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+
+
+ // set email based job monitoring email address if monitor mode is JOB_EMAIL_NOTIFICATION_MONITOR
+ boolean addJobNotifMail = isEmailBasedJobMonitor(jobExecutionContext);
+ String emailIds = null;
+ if (addJobNotifMail) {
+ emailIds = ServerSettings.getEmailBasedMonitorAddress();
+ }
+ // add all configured job notification email addresses.
+ if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+ String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+ if (flags != null && jobExecutionContext.getApplicationContext().getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+ flags = "ALL";
+ }
+ jobDescriptor.setMailOptions(flags);
+
+ String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+ if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds += ("," + userJobNotifEmailIds);
+ } else {
+ emailIds = userJobNotifEmailIds;
+ }
+ }
+
+ if (taskData.isEnableEmailNotification()) {
+ List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+ String elist = GFacUtils.listToCsv(emailList, ',');
+ if (elist != null && !elist.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds = emailIds + "," + elist;
+ } else {
+ emailIds = elist;
+ }
+ }
+ }
+ }
+ if (emailIds != null && !emailIds.isEmpty()) {
+ logger.info("Email list: " + emailIds);
+ jobDescriptor.setMailAddress(emailIds);
+ }
+ // this is common for any application descriptor
+
+ jobDescriptor.setCallBackIp(ServerSettings.getIp());
+ jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
+ jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+ jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+ jobDescriptor.setExecutablePath(jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getExecutablePath());
+ jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+ jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
+ String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount();
+ if (computationalProjectAccount == null){
+ ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference();
+ if (computeResourcePreference != null) {
+ computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber();
+ }
+ }
+ if (computationalProjectAccount != null) {
+ jobDescriptor.setAcountString(computationalProjectAccount);
+ }
+ // To make job name alpha numeric
+ jobDescriptor.setJobName("A" + String.valueOf(generateJobName()));
+ jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
+
+ List<String> inputValues = new ArrayList<String>();
+ MessageContext input = jobExecutionContext.getInMessageContext();
+
+ // sort the inputs first and then build the command ListR
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (Object object : input.getParameters().values()) {
+ if (object instanceof InputDataObjectType) {
+ InputDataObjectType inputDOT = (InputDataObjectType) object;
+ sortedInputSet.add(inputDOT);
+ }
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+ continue;
+ }
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ inputValues.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ if (inputDataObjectType.getType() == DataType.URI) {
+ // set only the relative path
+ String filePath = inputDataObjectType.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }else {
+ inputValues.add(inputDataObjectType.getValue());
+ }
+
+ }
+ }
+ Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters();
+ for (Object outputParam : outputParams.values()) {
+ if (outputParam instanceof OutputDataObjectType) {
+ OutputDataObjectType output = (OutputDataObjectType) outputParam;
+ if (output.getApplicationArgument() != null
+ && !output.getApplicationArgument().equals("")) {
+ inputValues.add(output.getApplicationArgument());
+ }
+ if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+ if (output.getType() == DataType.URI){
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }
+ }
+
+ jobDescriptor.setInputValues(inputValues);
+ jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+
+ ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+
+
+ ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling();
+ if (taskScheduling != null) {
+ int totalNodeCount = taskScheduling.getNodeCount();
+ int totalCPUCount = taskScheduling.getTotalCPUCount();
+
+
+ if (taskScheduling.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+ }
+ if (taskScheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(taskScheduling.getQueueName());
+ }
+
+ if (totalNodeCount > 0) {
+ jobDescriptor.setNodes(totalNodeCount);
+ }
+ if (taskScheduling.getComputationalProjectAccount() != null) {
+ jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+ }
+ if (taskScheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(taskScheduling.getQueueName());
+ }
+ if (totalCPUCount > 0) {
+ int ppn = totalCPUCount / totalNodeCount;
+ jobDescriptor.setProcessesPerNode(ppn);
+ jobDescriptor.setCPUCount(totalCPUCount);
+ }
+ if (taskScheduling.getWallTimeLimit() > 0) {
+ jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
+ if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+ jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
+ }
+ }
+ if (taskScheduling.getTotalPhysicalMemory() > 0) {
+ jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
+ }
+ } else {
+ logger.error("Task scheduling cannot be null at this point..");
+ }
+ ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+ if (moduleCmds != null) {
+ for (String moduleCmd : moduleCmds) {
+ jobDescriptor.addModuleLoadCommands(moduleCmd);
+ }
+ }
+ List<String> preJobCommands = appDepDescription.getPreJobCommands();
+ if (preJobCommands != null) {
+ for (String preJobCommand : preJobCommands) {
+ jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext));
+ }
+ }
+
+ List<String> postJobCommands = appDepDescription.getPostJobCommands();
+ if (postJobCommands != null) {
+ for (String postJobCommand : postJobCommands) {
+ jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext));
+ }
+ }
+
+ ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+ if (parallelism != null){
+ if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+ Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+ if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+ for (JobManagerCommand command : jobManagerCommands.keySet()) {
+ if (command == JobManagerCommand.SUBMISSION) {
+ String commandVal = jobManagerCommands.get(command);
+ jobDescriptor.setJobSubmitter(commandVal);
+ }
+ }
+ }
+ }
+ }
+ return jobDescriptor;
+ }
+
+ public static boolean isEmailBasedJobMonitor(JobExecutionContext jobExecutionContext) throws AppCatalogException {
+ if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+ } else {
+ return false;
+ }
+ }
+
+ private static int generateJobName() {
+ Random random = new Random();
+ int i = random.nextInt(Integer.MAX_VALUE);
+ i = i + 99999999;
+ if(i<0) {
+ i = i * (-1);
+ }
+ return i;
+ }
+
+ private static String parseCommand(String value, JobExecutionContext jobExecutionContext) {
+ String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir());
+ parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir());
+ parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir());
+ return parsedValue;
+ }
+ /**
+ * This method can be used to set the Security Context if its not set and later use it in other places
+ * @param jobExecutionContext
+ * @param authenticationInfo
+ * @param userName
+ * @param hostName
+ * @param port
+ * @return
+ * @throws GFacException
+ */
+ public static String prepareSecurityContext(JobExecutionContext jobExecutionContext, AuthenticationInfo authenticationInfo
+ , String userName, String hostName, int port) throws GFacException {
+ ServerInfo serverInfo = new ServerInfo(userName, hostName);
+ String key = userName+hostName+port;
+ SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key);
+ if (jobExecutionContext.getSecurityContext(key) == null) {
+ try {
+ GFACSSHUtils.addSecurityContext(jobExecutionContext, sshAuthWrapper);
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage());
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ logger.error(e1.getLocalizedMessage());
+ }
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ return key;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/airavata/blob/b4ede9cb/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
index 0000000,73a6e4a..38981aa
mode 000000,100644..100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2TestWithSSHAuth.java
@@@ -1,0 -1,252 +1,252 @@@
+ ///*
+ // *
+ // * Licensed to the Apache Software Foundation (ASF) under one
+ // * or more contributor license agreements. See the NOTICE file
+ // * distributed with this work for additional information
+ // * regarding copyright ownership. The ASF licenses this file
+ // * to you under the Apache License, Version 2.0 (the
+ // * "License"); you may not use this file except in compliance
+ // * with the License. You may obtain a copy of the License at
+ // *
+ // * http://www.apache.org/licenses/LICENSE-2.0
+ // *
+ // * Unless required by applicable law or agreed to in writing,
+ // * software distributed under the License is distributed on an
+ // * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // * KIND, either express or implied. See the License for the
+ // * specific language governing permissions and limitations
+ // * under the License.
+ // *
+ //*/
+ //package org.apache.airavata.core.gfac.services.impl;
+ //
+ //import org.apache.airavata.commons.gfac.type.ActualParameter;
+ //import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+ //import org.apache.airavata.commons.gfac.type.HostDescription;
+ //import org.apache.airavata.commons.gfac.type.ServiceDescription;
+ //import org.apache.airavata.gfac.GFacConfiguration;
+ //import org.apache.airavata.gfac.GFacException;
+ //import org.apache.airavata.gfac.SecurityContext;
+ //import org.apache.airavata.gfac.core.context.ApplicationContext;
+ //import org.apache.airavata.gfac.core.context.JobExecutionContext;
+ //import org.apache.airavata.gfac.core.context.MessageContext;
+ //import org.apache.airavata.gfac.impl.BetterGfacImpl;
+ //import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+ //import org.apache.airavata.gfac.ssh.api.Cluster;
+ //import org.apache.airavata.gfac.ssh.api.SSHApiException;
+ //import org.apache.airavata.gfac.ssh.api.ServerInfo;
+ //import AuthenticationInfo;
+ //import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration;
+ //import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+ //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+ //import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+ //import org.apache.airavata.gfac.ssh.util.CommonUtils;
+ //import org.apache.airavata.model.workspace.experiment.TaskDetails;
-//import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
++//import org.apache.airavata.experiment.registry.jpa.impl.RegistryFactory;
+ //import org.apache.airavata.schemas.gfac.*;
+ //import org.testng.annotations.BeforeClass;
+ //import org.testng.annotations.Test;
+ //
+ //import java.io.File;
+ //import java.net.URL;
+ //import java.util.ArrayList;
+ //import java.util.Date;
+ //import java.util.List;
+ //import java.util.UUID;
+ //
+ //public class BigRed2TestWithSSHAuth {
+ // private JobExecutionContext jobExecutionContext;
+ //
+ // private String userName;
+ // private String password;
+ // private String passPhrase;
+ // private String hostName;
+ // private String workingDirectory;
+ // private String privateKeyPath;
+ // private String publicKeyPath;
+ //
+ // @BeforeClass
+ // public void setUp() throws Exception {
+ //
+ // System.out.println("Test case name " + this.getClass().getName());
+ //// System.setProperty("ssh.host","bigred2.uits.iu.edu"); //default ssh host
+ //// System.setProperty("ssh.user", "lginnali");
+ //// System.setProperty("ssh.private.key.path", "/Users/lahirugunathilake/.ssh/id_dsa");
+ //// System.setProperty("ssh.public.key.path", "/Users/lahirugunathilake/.ssh/id_dsa.pub");
+ //// System.setProperty("ssh.working.directory", "/tmp");
+ //
+ // this.hostName = "bigred2.uits.iu.edu";
+ // this.hostName = System.getProperty("ssh.host");
+ // this.userName = System.getProperty("ssh.username");
+ // this.password = System.getProperty("ssh.password");
+ // this.privateKeyPath = System.getProperty("private.ssh.key");
+ // this.publicKeyPath = System.getProperty("public.ssh.key");
+ // this.passPhrase = System.getProperty("ssh.keypass");
+ // this.workingDirectory = System.getProperty("ssh.working.directory");
+ //
+ //
+ // if (this.userName == null
+ // || (this.password==null && (this.publicKeyPath == null || this.privateKeyPath == null)) || this.workingDirectory == null) {
+ // System.out.println("########### In order to test you have to either username password or private,public keys");
+ // System.out.println("Use -Dssh.username=xxx -Dssh.password=yyy -Dssh.keypass=zzz " +
+ // "-Dprivate.ssh.key -Dpublic.ssh.key -Dssh.working.directory ");
+ // }
+ // URL resource = BigRed2TestWithSSHAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ // assert resource != null;
+ // System.out.println(resource.getFile());
+ // GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null);
+ //
+ //// gFacConfiguration.setMyProxyLifeCycle(3600);
+ //// gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
+ //// gFacConfiguration.setMyProxyUser("*****");
+ //// gFacConfiguration.setMyProxyPassphrase("*****");
+ //// gFacConfiguration.setTrustedCertLocation("./certificates");
+ //// //have to set InFlwo Handlers and outFlowHandlers
+ //// gFacConfiguration.setInHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GramDirectorySetupHandler","org.apache.airavata.gfac.handler.GridFTPInputHandler"}));
+ //// gFacConfiguration.setOutHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GridFTPOutputHandler"}));
+ //
+ // /*
+ // * Host
+ // */
+ // HostDescription host = new HostDescription(SSHHostType.type);
+ // host.getType().setHostAddress(hostName);
+ // host.getType().setHostName(hostName);
+ // ((SSHHostType)host.getType()).setHpcResource(true);
+ // /*
+ // * App
+ // */
+ // ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type);
+ // HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType();
+ // ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ // name.setStringValue("EchoLocal");
+ // app.setApplicationName(name);
+ //
+ // app.setCpuCount(1);
+ // app.setJobType(JobTypeType.SERIAL);
+ // app.setNodeCount(1);
+ // app.setProcessorsPerNode(1);
+ //
+ // /*
+ // * Use bat file if it is compiled on Windows
+ // */
+ // app.setExecutableLocation("/bin/echo");
+ //
+ // /*
+ // * Default tmp location
+ // */
+ // String tempDir = "/tmp";
+ // String date = (new Date()).toString();
+ // date = date.replaceAll(" ", "_");
+ // date = date.replaceAll(":", "_");
+ //
+ // tempDir = tempDir + File.separator
+ // + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID();
+ //
+ // System.out.println(tempDir);
+ // app.setScratchWorkingDirectory(tempDir);
+ // app.setStaticWorkingDirectory(tempDir);
+ // app.setInputDataDirectory(tempDir + File.separator + "inputData");
+ // app.setOutputDataDirectory(tempDir + File.separator + "outputData");
+ // app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout");
+ // app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr");
+ // app.setMaxWallTime(5);
+ // app.setJobSubmitterCommand("aprun -n 1");
+ // app.setInstalledParentPath("/opt/torque/torque-4.2.3.1/bin/");
+ //
+ // /*
+ // * Service
+ // */
+ // ServiceDescription serv = new ServiceDescription();
+ // serv.getType().setName("SimpleEcho");
+ //
+ // List<InputParameterType> inputList = new ArrayList<InputParameterType>();
+ //
+ // InputParameterType input = InputParameterType.Factory.newInstance();
+ // input.setParameterName("echo_input");
+ // input.setParameterType(StringParameterType.Factory.newInstance());
+ // inputList.add(input);
+ //
+ // InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
+ //
+ // .size()]);
+ // List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
+ // OutputParameterType output = OutputParameterType.Factory.newInstance();
+ // output.setParameterName("echo_output");
+ // output.setParameterType(StringParameterType.Factory.newInstance());
+ // outputList.add(output);
+ //
+ // OutputParameterType[] outputParamList = outputList
+ // .toArray(new OutputParameterType[outputList.size()]);
+ //
+ // serv.getType().setInputParametersArray(inputParamList);
+ // serv.getType().setOutputParametersArray(outputParamList);
+ //
+ // jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName());
+ // // Adding security context
+ // jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, getSecurityContext(app));
+ // ApplicationContext applicationContext = new ApplicationContext();
+ // jobExecutionContext.setApplicationContext(applicationContext);
+ // applicationContext.setServiceDescription(serv);
+ // applicationContext.setApplicationDeploymentDescription(appDesc);
+ // applicationContext.setHostDescription(host);
+ //
+ // MessageContext inMessage = new MessageContext();
+ // ActualParameter echo_input = new ActualParameter();
+ // ((StringParameterType) echo_input.getType()).setValue("echo_output=hello");
+ // inMessage.addParameter("echo_input", echo_input);
+ //
+ //
+ // jobExecutionContext.setInMessageContext(inMessage);
+ //
+ // MessageContext outMessage = new MessageContext();
+ // ActualParameter echo_out = new ActualParameter();
+ //// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ // outMessage.addParameter("echo_output", echo_out);
+ // jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry());
+ // jobExecutionContext.setTaskData(new TaskDetails("11323"));
+ // jobExecutionContext.setOutMessageContext(outMessage);
+ //
+ // }
+ //
+ //
+ // private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) {
+ // try {
+ //
+ // AuthenticationInfo authenticationInfo = null;
+ // if (password != null) {
+ // authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+ // } else {
+ // authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+ // this.passPhrase);
+ // }
+ // // Server info
+ // ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
+ //
+ // Cluster pbsCluster = null;
+ // SSHSecurityContext sshSecurityContext = null;
+ //
+ // JobManagerConfiguration pbsJobManager = CommonUtils.getPBSJobManager(app.getInstalledParentPath());
+ // pbsCluster = new PBSCluster(serverInfo, authenticationInfo, pbsJobManager);
+ //
+ //
+ // sshSecurityContext = new SSHSecurityContext();
+ // sshSecurityContext.setPbsCluster(pbsCluster);
+ // sshSecurityContext.setUsername(userName);
+ // sshSecurityContext.setKeyPass(passPhrase);
+ // sshSecurityContext.setPrivateKeyLoc(privateKeyPath);
+ // return sshSecurityContext;
+ // } catch (SSHApiException e) {
+ // e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ // }
+ // return null;
+ // }
+ //
+ // @Test
+ // public void testSSHProvider() throws GFacException {
+ // BetterGfacImpl gFacAPI = new BetterGfacImpl();
+ // gFacAPI.submitJob(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+ // org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobDescription());
+ // org.junit.Assert.assertNotNull(jobExecutionContext.getJobDetails().getJobID());
+ // }
+ //
+ //}