You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/05/02 17:50:21 UTC

[1/3] airavata git commit: Add implementation for BESJobSubmissionTask

Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 9f0e45b25 -> d231956e8


http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
new file mode 100644
index 0000000..adacb21
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
@@ -0,0 +1,195 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.security.ProxyCertOutHandler;
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class UNICORESecurityContext extends X509SecurityContext {
+
+	private static final long serialVersionUID = 1L;
+	protected static final Logger log = LoggerFactory.getLogger(UNICORESecurityContext.class);
+	private DefaultClientConfiguration secProperties;
+	
+	
+	public UNICORESecurityContext(CredentialReader credentialReader, RequestData requestData) {
+		super(credentialReader, requestData);
+	}
+	
+	
+
+	public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging) throws WorkerException, ApplicationSettingsException {
+		try{
+			X509Credential cred = getX509Credentials();
+			secProperties = new DefaultClientConfiguration(dcValidator, cred);
+			setExtraSettings();
+		}
+		catch (Exception e) {
+			throw new WorkerException(e.getMessage(), e);
+		} 
+		if(enableMessageLogging) secProperties.setMessageLogging(true);
+		
+		return secProperties;
+	}
+
+	public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging,
+															  UserConfigurationDataModel userDataModel)
+			throws WorkerException, ApplicationSettingsException {
+
+		X509Credential cred = null;
+		try{
+			boolean genCert = userDataModel.isGenerateCert();
+				if(genCert) {
+					String userDN = userDataModel.getUserDN();
+					if (userDN == null || "".equals(userDN)){
+						log.warn("Cannot generate cert, falling back to GFAC configured MyProxy credentials");
+						return getDefaultConfiguration(enableMessageLogging);
+					}
+					else {
+						log.info("Generating X.509 certificate for: "+userDN);
+						try {
+							String caCertPath = ServerSettings.getSetting(BESConstants.PROP_CA_CERT_PATH, "");
+							String caKeyPath = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PATH, "");
+							String caKeyPass = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PASS, "");
+							
+							if(caCertPath.equals("") || caKeyPath.equals("")) {
+								throw new Exception("CA certificate or key file path missing in the properties file. "
+										+ "Please make sure " + BESConstants.PROP_CA_CERT_PATH + " or "
+										+ BESConstants.PROP_CA_KEY_PATH + " are not empty.");
+							}
+							
+							if("".equals(caKeyPass)) {
+								log.warn("Caution: CA key has no password. For security reasons it is highly recommended to set a CA key password");
+							}
+							cred = generateShortLivedCredential(userDN, caCertPath, caKeyPath, caKeyPass);
+						}catch (Exception e){
+							throw new WorkerException("Error occured while generating a short lived credential for user:"+userDN, e);
+						}
+						
+					}
+				}else  {
+					return getDefaultConfiguration(enableMessageLogging);
+				}
+				
+			secProperties = new DefaultClientConfiguration(dcValidator, cred);
+			setExtraSettings();
+		}
+		catch (Exception e) {
+			throw new WorkerException(e.getMessage(), e);
+		} 
+		secProperties.getETDSettings().setExtendTrustDelegation(true);
+		if(enableMessageLogging) secProperties.setMessageLogging(true);
+//		secProperties.setDoSignMessage(true);
+		secProperties.getETDSettings()
+				.setIssuerCertificateChain(secProperties.getCredential().getCertificateChain());
+		
+		return secProperties;
+	}
+
+	
+	/**
+	 * Get server signed credentials. Each time it is invoked new certificate 
+	 * is returned.
+	 * 
+	 * @param userID
+	 * @param userDN
+	 * @param caCertPath
+	 * @param caKeyPath
+	 * @param caKeyPwd
+	 * @return
+	 * @throws WorkerException
+	 */
+	public DefaultClientConfiguration getServerSignedConfiguration(String userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) throws WorkerException {
+		try {
+			KeyAndCertCredential cred = SecurityUtils.generateShortLivedCertificate(userDN,caCertPath,caKeyPath,caKeyPwd);
+			secProperties = new DefaultClientConfiguration(dcValidator, cred);
+			setExtraSettings();
+		} catch (Exception e) {
+			throw new WorkerException(e.getMessage(), e);
+		}
+
+		return secProperties;
+	}
+
+	
+	
+
+	
+	private void setExtraSettings(){
+		secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+		secProperties.setDoSignMessage(true);
+
+		String[] outHandlers = secProperties.getOutHandlerClassNames();
+		
+		Set<String> outHandlerLst = null;
+
+		// timeout in milliseconds
+		Properties p = secProperties.getExtraSettings();
+		
+		if(p == null) {
+			p = new Properties();
+		}
+		
+		p.setProperty("http.connection.timeout", "5000");
+		p.setProperty("http.socket.timeout", "5000");
+		
+		if (outHandlers == null) {
+			outHandlerLst = new HashSet<String>();
+		} else {
+			outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
+		}
+
+		outHandlerLst.add(ProxyCertOutHandler.class.getName());
+		
+		secProperties.setOutHandlerClassNames(outHandlerLst
+				.toArray(new String[outHandlerLst.size()]));
+		
+		secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+	}
+
+
+    private String getCNFromUserDN(String userDN) {
+        return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
new file mode 100644
index 0000000..1f83370
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+
+import java.net.URISyntaxException;
+
+public class URIUtils {
+
+	public static String encodeAll(String uri) throws URIException
+	{
+		String result = encodeAuthority(uri);
+		result = encodePath(uri);
+		result = encodeQuery(result );
+		result  = encodeFragment(result );
+		return result;
+	}
+	
+	public static String encodeAuthority(String uri) throws URIException
+	{
+		int start = uri.indexOf("//");
+		if(start == -1) return uri;
+		start++;
+		int end = uri.indexOf("/",start+1);
+		if(end == -1) end = uri.indexOf("?",start+1);
+		if(end == -1) end = uri.indexOf("#",start+1);
+		if(end == -1) end = uri.length();
+		String before = uri.substring(0, start+1);
+		String authority= uri.substring(start+1,end);
+		String after = uri.substring(end);
+		authority = URIUtil.encode(authority, URI.allowed_authority);
+	
+		return before+authority+after;
+	}
+	
+	public static String encodePath(String uri) throws URIException
+	{
+		int doubleSlashIndex = uri.indexOf("//");
+		boolean hasAuthority =  doubleSlashIndex >= 0;
+		int start = -1;
+		if(hasAuthority)
+		{
+			start = uri.indexOf("/",doubleSlashIndex+2);
+		}
+		else
+		{
+			start = uri.indexOf(":");
+		}
+		if(start == -1) return uri;
+		
+		int end = uri.indexOf("?",start+1);
+		if(end == -1) end = uri.indexOf("#",start+1);
+		if(end == -1) end = uri.length();
+		String before = uri.substring(0, start+1);
+		String path= uri.substring(start+1,end);
+		String after = uri.substring(end);
+		path = URIUtil.encode(path, URI.allowed_abs_path);
+		return before+path+after;
+	}
+	
+	
+	public static String encodeQuery(String uri) throws URIException
+	{
+		int queryStart = uri.indexOf("?");
+		if(queryStart == -1) return uri;
+		int queryEnd = uri.indexOf("#");
+		if(queryEnd == -1) queryEnd = uri.length();
+		
+		String beforeQuery = uri.substring(0, queryStart+1);
+		String query = uri.substring(queryStart+1,queryEnd);
+		String afterQuery = uri.substring(queryEnd);
+		query = URIUtil.encode(query, URI.allowed_query);
+		return beforeQuery+query+afterQuery;
+	}
+	
+	
+	public static String encodeFragment(String uri) throws URIException
+	{
+		int fragmentStart = uri.indexOf("#");
+		if(fragmentStart == -1) return uri;
+
+		String beforeFragment = uri.substring(0, fragmentStart+1);
+		String fragment = uri.substring(fragmentStart+1);
+		fragment = URIUtil.encode(fragment, URI.allowed_fragment);
+		return beforeFragment+fragment;
+	}
+    
+	public static java.net.URI createGsiftpURI(String host, String localPath) throws URISyntaxException {
+        StringBuffer buf = new StringBuffer();
+        if (!host.startsWith("gsiftp://"))
+            buf.append("gsiftp://");
+        buf.append(host);
+        if (!host.endsWith("/"))
+            buf.append("/");
+        buf.append(localPath);
+        return new java.net.URI(buf.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
new file mode 100644
index 0000000..5be5bc1
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.context.AbstractSecurityContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.cxf.interceptor.security.AbstractSecurityContextInInterceptor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Handles X509 Certificate based security.
+ */
+public class X509SecurityContext extends AbstractSecurityContext {
+
+	private static final long serialVersionUID = 1L;
+
+	protected static final Logger log = LoggerFactory.getLogger(X509SecurityContext.class);
+    
+    /*
+     * context name
+     */
+    public static final String X509_SECURITY_CONTEXT = "x509.security.context";
+    
+    public static final int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+    
+    protected static DirectoryCertChainValidator dcValidator;
+    
+    private X509Credential x509Credentials= null;
+
+    static {
+        try {
+			setUpTrustedCertificatePath();
+			// set up directory based trust validator
+			dcValidator = getTrustedCerts();
+		} catch (Exception e) {
+			log.error(e.getLocalizedMessage(), e);
+		}
+    }
+    
+
+    public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
+
+        File file = new File(trustedCertificatePath);
+
+        if (!file.exists() || !file.canRead()) {
+            File f = new File(".");
+            log.info("Current directory " + f.getAbsolutePath());
+            throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
+        } else {
+            System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+        }
+    }
+
+    private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
+
+        String trustedCertificatePath  = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+        setUpTrustedCertificatePath(trustedCertificatePath);
+    }
+
+    /**
+     * Gets the trusted certificate path. Trusted certificate path is stored in "X509_CERT_DIR"
+     * system property.
+     * @return The trusted certificate path as a string.
+     */
+    public static String getTrustedCertificatePath() {
+        return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
+    }
+
+
+    public X509SecurityContext(CredentialReader credentialReader, RequestData requestData) {
+        super(credentialReader, requestData);
+    }
+
+
+    /**
+     * Gets X509Credentials. The process is as follows;
+     * If credentials were queried for the first time create credentials.
+     *   1. Try creating credentials using certificates stored in the credential store
+     *   2. If 1 fails use user name and password to create credentials
+     * @return x509credentials (from CANL security API)
+     * @throws ApplicationSettingsException
+     */
+    public X509Credential getX509Credentials() throws WorkerException, ApplicationSettingsException {
+
+    	if(getCredentialReader() == null) {
+    		return getDefaultCredentials();
+    	}
+
+        if (x509Credentials == null) {
+
+            try {
+                x509Credentials = getCredentialsFromStore();
+            } catch (Exception e) {
+                log.error("An exception occurred while retrieving credentials from the credential store. " +
+                        "Will continue with my proxy user name and password.", e);
+            }
+
+            // If store does not have credentials try to get from user name and password
+            if (x509Credentials == null) {
+                x509Credentials = getDefaultCredentials();
+            }
+
+            // if still null, throw an exception
+            if (x509Credentials == null) {
+                throw new WorkerException("Unable to retrieve my proxy credentials to continue operation.");
+            }
+        } else {
+            try {
+
+            	final long remainingTime = x509Credentials.getCertificate().getNotAfter().getTime() - new Date().getTime();
+
+                if (remainingTime < CREDENTIAL_RENEWING_THRESH_HOLD) {
+                	//                    return renewCredentials();
+                	log.warn("Do not support credentials renewal");
+                }
+
+                log.info("Fall back to get new default credentials");
+
+                try {
+                	x509Credentials.getCertificate().checkValidity();
+                }catch(Exception e){
+                	x509Credentials = getDefaultCredentials();
+                }
+
+            } catch (Exception e) {
+                throw new WorkerException("Unable to retrieve remaining life time from credentials.", e);
+            }
+        }
+
+        return x509Credentials;
+    }
+
+        /**
+     * Reads the credentials from credential store.
+     * @return If token is found in the credential store, will return a valid credential. Else returns null.
+     * @throws Exception If an error occurred while retrieving credentials.
+     */
+    public  X509Credential getCredentialsFromStore() throws Exception {
+
+        if (getCredentialReader() == null) {
+            return null;
+        }
+
+        Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
+                getRequestData().getTokenId());
+
+        if (credential != null) {
+            if (credential instanceof CertificateCredential) {
+
+                log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
+                        " gateway id - " + getRequestData().getGatewayId());
+
+                CertificateCredential certificateCredential = (CertificateCredential) credential;
+
+                X509Certificate[] certificates = certificateCredential.getCertificates();
+
+                KeyAndCertCredential keyAndCert = new KeyAndCertCredential(certificateCredential.getPrivateKey(), certificates);
+
+                return keyAndCert;
+                //return new GlobusGSSCredentialImpl(newCredential,
+                //        GSSCredential.INITIATE_AND_ACCEPT);
+            } else {
+                log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
+                        "Credential type - " + credential.getClass().getName());
+            }
+        } else {
+            log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
+                    + "gateway id - " + getRequestData().getGatewayId());
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the default proxy certificate.
+     * @return Default my proxy credentials.
+     * @throws ApplicationSettingsException
+     */
+    public X509Credential getDefaultCredentials() throws WorkerException, ApplicationSettingsException {
+    	MyProxyLogon logon = new MyProxyLogon();
+    	logon.setValidator(dcValidator);
+    	logon.setHost(getRequestData().getMyProxyServerUrl());
+    	logon.setPort(getRequestData().getMyProxyPort());
+    	logon.setUsername(getRequestData().getMyProxyUserName());
+    	logon.setPassphrase(getRequestData().getMyProxyPassword().toCharArray());
+    	logon.setLifetime(getRequestData().getMyProxyLifeTime());
+    	
+        	try {
+				logon.connect();
+				logon.logon();
+				logon.getCredentials();
+				logon.disconnect();
+				PrivateKey pk=logon.getPrivateKey();
+				return new KeyAndCertCredential(pk, new X509Certificate[]{logon.getCertificate()});
+			}  catch (Exception e) {
+				throw new WorkerException("An error occurred while retrieving default security credentials.", e);
+			}
+    }
+
+	private static  DirectoryCertChainValidator getTrustedCerts() throws Exception{
+		String certLocation = getTrustedCertificatePath();
+		List<String> trustedCert = new ArrayList<String>();
+		trustedCert.add(certLocation + "/*.0");
+		trustedCert.add(certLocation + "/*.pem");
+		DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1, 60000, null);
+		return dcValidator;
+	}
+	
+	private String getCNFromUserDN(String userDN) {
+		return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+	}
+	
+	public KeyAndCertCredential generateShortLivedCredential(String userDN, String caCertPath, String caKeyPath,
+			String caPwd) throws Exception {
+		final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+																	// ago
+
+		final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+		final long endTime = startTime + 30 * 3600 * 1000;
+
+		String keyLengthProp = "1024";
+		int keyLength = Integer.parseInt(keyLengthProp);
+		String signatureAlgorithm = "SHA1withRSA";
+
+		KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
+
+		KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+		kpg.initialize(keyLength);
+		KeyPair pair = kpg.generateKeyPair();
+
+		X500Principal subjectDN = new X500Principal(userDN);
+		Random rand = new Random();
+
+		SubjectPublicKeyInfo publicKeyInfo;
+		try {
+			publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
+					.readObject());
+		} catch (IOException e) {
+			throw new InvalidKeyException("Can not parse the public key"
+					+ "being included in the short lived certificate", e);
+		}
+
+		X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+		X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+		X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+				new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
+
+		AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+		X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+		certificate.checkValidity(new Date());
+		certificate.verify(caCred.getCertificate().getPublicKey());
+		KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+				caCred.getCertificate() });
+
+		return result;
+	}
+	
+	private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
+		InputStream isKey, isCert;
+		isKey = isCert = null;
+		try {
+		isKey = new FileInputStream(caKeyPath);
+		PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
+
+		isCert = new FileInputStream(caCertPath);
+		X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+		return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+		} finally {
+			if (isKey != null){
+				isKey.close();
+			}
+			if (isCert != null) {
+				isCert.close();
+			}
+		}
+	}
+	 
+    
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/pom.xml b/modules/worker/worker-core/pom.xml
index 8f00821..32d37d1 100644
--- a/modules/worker/worker-core/pom.xml
+++ b/modules/worker/worker-core/pom.xml
@@ -63,6 +63,12 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
new file mode 100644
index 0000000..8a961c5
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/28/13
+ * Time: 3:28 PM
+ */
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.worker.core.utils.WorkerConstants;
+
+/**
+ * Encapsulates GFac specific data that are coming in the request.
+ */
+public class RequestData {
+
+    private static final int DEFAULT_LIFE_TIME = 3600;
+    private static final int DEFAULT_MY_PROXY_PORT = 7512;
+
+    private String tokenId;
+    private String requestUser;
+    private String gatewayId;
+
+    private String myProxyServerUrl = null;
+    private int myProxyPort = 0;
+    private String myProxyUserName = null;
+    private String myProxyPassword = null;
+    private int myProxyLifeTime = DEFAULT_LIFE_TIME;
+
+
+
+
+    public RequestData() {
+    }
+
+    public RequestData(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public RequestData(String tokenId, String requestUser, String gatewayId) {
+        this.tokenId = tokenId;
+        this.requestUser = requestUser;
+        this.gatewayId = gatewayId;
+    }
+
+    public String getTokenId() {
+        return tokenId;
+    }
+
+    public void setTokenId(String tokenId) {
+        this.tokenId = tokenId;
+    }
+
+    public String getRequestUser() {
+        return requestUser;
+    }
+
+    public void setRequestUser(String requestUser) {
+        this.requestUser = requestUser;
+    }
+
+    public String getGatewayId() {
+        return gatewayId;
+    }
+
+    public void setGatewayId(String gatewayId) {
+        this.gatewayId = gatewayId;
+    }
+
+    public String getMyProxyServerUrl() throws ApplicationSettingsException {
+        if (myProxyServerUrl == null) {
+            myProxyServerUrl = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER);
+        }
+        return myProxyServerUrl;
+    }
+
+    public void setMyProxyServerUrl(String myProxyServerUrl) {
+        this.myProxyServerUrl = myProxyServerUrl;
+    }
+
+    public int getMyProxyPort() {
+
+        if (myProxyPort == 0) {
+            String sPort = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT));
+            myProxyPort = Integer.parseInt(sPort);
+        }
+
+        return myProxyPort;
+    }
+
+    public void setMyProxyPort(int myProxyPort) {
+        this.myProxyPort = myProxyPort;
+    }
+
+    public String getMyProxyUserName() throws ApplicationSettingsException {
+        if (myProxyUserName == null) {
+            myProxyUserName = ServerSettings.getSetting(WorkerConstants.MYPROXY_USER);
+        }
+
+        return myProxyUserName;
+    }
+
+    public void setMyProxyUserName(String myProxyUserName) {
+        this.myProxyUserName = myProxyUserName;
+    }
+
+    public String getMyProxyPassword() throws ApplicationSettingsException {
+
+        if (myProxyPassword == null) {
+            myProxyPassword = ServerSettings.getSetting(WorkerConstants.MYPROXY_PASS);
+        }
+
+        return myProxyPassword;
+    }
+
+    public int getMyProxyLifeTime() {
+        String life = ServerSettings.getSetting(WorkerConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime));
+        myProxyLifeTime = Integer.parseInt(life);
+        return myProxyLifeTime;
+    }
+
+    public void setMyProxyLifeTime(int myProxyLifeTime) {
+        this.myProxyLifeTime = myProxyLifeTime;
+    }
+
+    public void setMyProxyPassword(String myProxyPassword) {
+        this.myProxyPassword = myProxyPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
new file mode 100644
index 0000000..aec1d83
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+public interface SecurityContext {
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
new file mode 100644
index 0000000..c4798e2
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.context;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/26/13
+ * Time: 4:33 PM
+ */
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.SecurityContext;
+
+import java.io.Serializable;
+
+/**
+ * Abstract implementation of the security context.
+ */
+public abstract class AbstractSecurityContext implements SecurityContext, Serializable {
+
+    private CredentialReader credentialReader;
+    private RequestData requestData;
+
+    public AbstractSecurityContext(CredentialReader credentialReader, RequestData requestData) {
+        this.credentialReader = credentialReader;
+        this.requestData = requestData;
+    }
+    public AbstractSecurityContext() {
+
+    }
+
+    public CredentialReader getCredentialReader() {
+        return credentialReader;
+    }
+
+    public RequestData getRequestData() {
+        return requestData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
new file mode 100644
index 0000000..705b379
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
@@ -0,0 +1,524 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.core.utils;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class to do all ssh and scp related things.
+ */
+public class SSHUtils {
+	private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
+
+
+	/**
+	 * This will copy a local file to a remote location
+	 *
+	 * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass
+	 *                   a dirctory we do copy it to that directory but we simply return the directory name
+	 *                   todo handle the directory name as input and return the proper final output file name
+	 * @param localFile  Local file to transfer, this can be a directory
+	 * @return returns the final remote file path, so that users can use the new file location
+	 */
+	public static String scpTo(String localFile, String remoteFile, Session session) throws IOException,
+			JSchException, SSHApiException {
+		FileInputStream fis = null;
+		String prefix = null;
+		if (new File(localFile).isDirectory()) {
+			prefix = localFile + File.separator;
+		}
+		boolean ptimestamp = true;
+
+		// exec 'scp -t rfile' remotely
+		String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+		Channel channel = session.openChannel("exec");
+
+		StandardOutReader stdOutReader = new StandardOutReader();
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		((ChannelExec) channel).setCommand(command);
+
+		// get I/O streams for remote scp
+		OutputStream out = channel.getOutputStream();
+		InputStream in = channel.getInputStream();
+
+		channel.connect();
+
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+
+		File _lfile = new File(localFile);
+
+		if (ptimestamp) {
+			command = "T" + (_lfile.lastModified() / 1000) + " 0";
+			// The access time should be sent here,
+			// but it is not accessible with JavaAPI ;-<
+			command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+			out.write(command.getBytes());
+			out.flush();
+			if (checkAck(in) != 0) {
+				String error = "Error Reading input Stream";
+				log.error(error);
+				throw new SSHApiException(error);
+			}
+		}
+
+		// send "C0644 filesize filename", where filename should not include '/'
+		long filesize = _lfile.length();
+		command = "C0644 " + filesize + " ";
+		if (localFile.lastIndexOf('/') > 0) {
+			command += localFile.substring(localFile.lastIndexOf('/') + 1);
+		} else {
+			command += localFile;
+		}
+		command += "\n";
+		out.write(command.getBytes());
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+
+		// send a content of localFile
+		fis = new FileInputStream(localFile);
+		byte[] buf = new byte[1024];
+		while (true) {
+			int len = fis.read(buf, 0, buf.length);
+			if (len <= 0) break;
+			out.write(buf, 0, len); //out.flush();
+		}
+		fis.close();
+		fis = null;
+		// send '\0'
+		buf[0] = 0;
+		out.write(buf, 0, 1);
+		out.flush();
+		if (checkAck(in) != 0) {
+			String error = "Error Reading input Stream";
+			log.error(error);
+			throw new SSHApiException(error);
+		}
+		out.close();
+		stdOutReader.onOutput(channel);
+
+
+		channel.disconnect();
+		if (stdOutReader.getStdErrorString().contains("scp:")) {
+			throw new SSHApiException(stdOutReader.getStdErrorString());
+		}
+		//since remote file is always a file  we just return the file
+		return remoteFile;
+	}
+
+	/**
+	 * This method will copy a remote file to a local directory
+	 *
+	 * @param remoteFile remote file path, this has to be a full qualified path
+	 * @param localFile  This is the local file to copy, this can be a directory too
+	 * @return returns the final local file path of the new file came from the remote resource
+	 */
+	public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException,
+			JSchException, SSHApiException {
+		FileOutputStream fos = null;
+		try {
+			String prefix = null;
+			if (new File(localFile).isDirectory()) {
+				prefix = localFile + File.separator;
+			}
+
+			// exec 'scp -f remotefile' remotely
+			String command = "scp -f " + remoteFile;
+			Channel channel = session.openChannel("exec");
+			((ChannelExec) channel).setCommand(command);
+
+			StandardOutReader stdOutReader = new StandardOutReader();
+			((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+			// get I/O streams for remote scp
+			OutputStream out = channel.getOutputStream();
+			InputStream in = channel.getInputStream();
+
+            if (!channel.isClosed()){
+                channel.connect();
+            }
+
+			byte[] buf = new byte[1024];
+
+			// send '\0'
+			buf[0] = 0;
+			out.write(buf, 0, 1);
+			out.flush();
+
+			while (true) {
+				int c = checkAck(in);
+				if (c != 'C') {
+					break;
+				}
+
+				// read '0644 '
+				in.read(buf, 0, 5);
+
+				long filesize = 0L;
+				while (true) {
+					if (in.read(buf, 0, 1) < 0) {
+						// error
+						break;
+					}
+					if (buf[0] == ' ') break;
+					filesize = filesize * 10L + (long) (buf[0] - '0');
+				}
+
+				String file = null;
+				for (int i = 0; ; i++) {
+					in.read(buf, i, 1);
+					if (buf[i] == (byte) 0x0a) {
+						file = new String(buf, 0, i);
+						break;
+					}
+				}
+
+				//System.out.println("filesize="+filesize+", file="+file);
+
+				// send '\0'
+				buf[0] = 0;
+				out.write(buf, 0, 1);
+				out.flush();
+
+				// read a content of lfile
+				fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+				int foo;
+				while (true) {
+					if (buf.length < filesize) foo = buf.length;
+					else foo = (int) filesize;
+					foo = in.read(buf, 0, foo);
+					if (foo < 0) {
+						// error
+						break;
+					}
+					fos.write(buf, 0, foo);
+					filesize -= foo;
+					if (filesize == 0L) break;
+				}
+				fos.close();
+				fos = null;
+
+				if (checkAck(in) != 0) {
+					String error = "Error transfering the file content";
+					log.error(error);
+					throw new SSHApiException(error);
+				}
+
+				// send '\0'
+				buf[0] = 0;
+				out.write(buf, 0, 1);
+				out.flush();
+			}
+			stdOutReader.onOutput(channel);
+			if (stdOutReader.getStdErrorString().contains("scp:")) {
+				throw new SSHApiException(stdOutReader.getStdErrorString());
+			}
+
+		} catch (Exception e) {
+			log.error(e.getMessage(), e);
+		} finally {
+			try {
+				if (fos != null) fos.close();
+			} catch (Exception ee) {
+			}
+		}
+	}
+
+    /**
+     * This method will copy a remote file to a local directory
+     *
+     * @param sourceFile remote file path, this has to be a full qualified path
+     * @param sourceSession JSch session for source
+     * @param destinationFile This is the local file to copy, this can be a directory too
+     * @param destinationSession JSch Session for target
+     * @return returns the final local file path of the new file came from the remote resource
+     */
+    public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession, boolean ignoreEmptyFile) throws
+            IOException, JSchException {
+        OutputStream sout = null;
+        InputStream sin = null;
+        OutputStream dout = null;
+        InputStream din = null;
+        try {
+            String prefix = null;
+
+            // exec 'scp -f sourceFile'
+            String sourceCommand = "scp -f " + sourceFile;
+            Channel sourceChannel = sourceSession.openChannel("exec");
+            ((ChannelExec) sourceChannel).setCommand(sourceCommand);
+            StandardOutReader sourceStdOutReader = new StandardOutReader();
+            ((ChannelExec) sourceChannel).setErrStream(sourceStdOutReader.getStandardError());
+            // get I/O streams for remote scp
+            sout = sourceChannel.getOutputStream();
+            sin = sourceChannel.getInputStream();
+            sourceChannel.connect();
+
+            boolean ptimestamp = true;
+            // exec 'scp -t destinationFile'
+            String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + destinationFile;
+            Channel targetChannel = destinationSession.openChannel("exec");
+            StandardOutReader targetStdOutReader = new StandardOutReader();
+            ((ChannelExec) targetChannel).setErrStream(targetStdOutReader.getStandardError());
+            ((ChannelExec) targetChannel).setCommand(command);
+            // get I/O streams for remote scp
+            dout = targetChannel.getOutputStream();
+            din = targetChannel.getInputStream();
+            targetChannel.connect();
+
+            if (checkAck(din) != 0) {
+                String error = "Error Reading input Stream";
+                log.error(error);
+                throw new Exception(error);
+            }
+
+
+            byte[] buf = new byte[1024];
+
+            // send '\0'
+            buf[0] = 0;
+            sout.write(buf, 0, 1);
+            sout.flush();
+
+			log.info("Initiating transfer from:" + sourceFile + " To: " + destinationFile + ", Ignore Empty file : " + ignoreEmptyFile);
+
+            while (true) {
+                int c = checkAck(sin);
+                if (c != 'C') {
+                    break;
+                }
+
+                // read '0644 '
+                sin.read(buf, 0, 5);
+
+                long fileSize = 0L;
+                while (true) {
+                    if (sin.read(buf, 0, 1) < 0) {
+                        // error
+                        break;
+                    }
+                    if (buf[0] == ' ') break;
+                    fileSize = fileSize * 10L + (long) (buf[0] - '0');
+                }
+
+                String fileName = null;
+                for (int i = 0; ; i++) {
+                    sin.read(buf, i, 1);
+                    if (buf[i] == (byte) 0x0a) {
+                        fileName = new String(buf, 0, i);
+                        break;
+                    }
+                }
+
+				//FIXME: Remove me after fixing file transfer issue
+				if(fileSize == 0L){
+					log.warn("*****Zero byte file*****. Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile);
+				}else{
+					log.info("Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile);
+				}
+
+                if (fileSize == 0L && !ignoreEmptyFile){
+                    String error = "Input file is empty...";
+                    log.error(error);
+                    throw new JSchException(error);
+                }
+                String initData = "C0644 " + fileSize + " " + fileName + "\n";
+                assert dout != null;
+                dout.write(initData.getBytes());
+                dout.flush();
+
+                // send '\0' to source
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+
+                int rLength;
+                while (true) {
+                    if (buf.length < fileSize) rLength = buf.length;
+                    else rLength = (int) fileSize;
+                    rLength = sin.read(buf, 0, rLength); // read content of the source File
+                    if (rLength < 0) {
+                        // error
+                        break;
+                    }
+                    dout.write(buf, 0, rLength); // write to destination file
+                    fileSize -= rLength;
+                    if (fileSize == 0L) break;
+                }
+
+                // send '\0' to target
+                buf[0] = 0;
+                dout.write(buf, 0, 1);
+                dout.flush();
+                if (checkAck(din) != 0) {
+                    String error = "Error Reading input Stream";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+                dout.close();
+                dout = null;
+
+                if (checkAck(sin) != 0) {
+                    String error = "Error transfering the file content";
+                    log.error(error);
+                    throw new Exception(error);
+                }
+
+                // send '\0'
+                buf[0] = 0;
+                sout.write(buf, 0, 1);
+                sout.flush();
+            }
+
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            throw new JSchException(e.getMessage());
+        } finally {
+            try {
+                if (dout != null) dout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (din != null) din.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (sout != null) sout.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+            try {
+                if (din != null) din.close();
+            } catch (Exception ee) {
+                log.error("", ee);
+            }
+        }
+    }
+
+	public static void makeDirectory(String path, Session session) throws IOException, JSchException, WorkerException {
+
+		// exec 'scp -t rfile' remotely
+		String command = "mkdir -p " + path;
+		Channel channel = session.openChannel("exec");
+		StandardOutReader stdOutReader = new StandardOutReader();
+
+		((ChannelExec) channel).setCommand(command);
+
+
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		try {
+			channel.connect();
+		} catch (JSchException e) {
+
+			channel.disconnect();
+//            session.disconnect();
+			log.error("Unable to retrieve command output. Command - " + command +
+					" on server - " + session.getHost() + ":" + session.getPort() +
+					" connecting user name - "
+					+ session.getUserName());
+			throw e;
+		}
+		stdOutReader.onOutput(channel);
+		if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+			throw new WorkerException(stdOutReader.getStdErrorString());
+		}
+
+		channel.disconnect();
+	}
+
+	public static List<String> listDirectory(String path, Session session) throws IOException, JSchException,
+			WorkerException {
+
+		// exec 'scp -t rfile' remotely
+		String command = "ls " + path;
+		Channel channel = session.openChannel("exec");
+		StandardOutReader stdOutReader = new StandardOutReader();
+
+		((ChannelExec) channel).setCommand(command);
+
+
+		((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+		try {
+			channel.connect();
+		} catch (JSchException e) {
+
+			channel.disconnect();
+//            session.disconnect();
+
+			throw new WorkerException("Unable to retrieve command output. Command - " + command +
+					" on server - " + session.getHost() + ":" + session.getPort() +
+					" connecting user name - "
+					+ session.getUserName(), e);
+		}
+		stdOutReader.onOutput(channel);
+		stdOutReader.getStdOutputString();
+		if (stdOutReader.getStdErrorString().contains("ls:")) {
+			throw new WorkerException(stdOutReader.getStdErrorString());
+		}
+		channel.disconnect();
+		return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+	}
+
+
+	static int checkAck(InputStream in) throws IOException {
+		int b = in.read();
+		if (b == 0) return b;
+		if (b == -1) return b;
+
+		if (b == 1 || b == 2) {
+			StringBuffer sb = new StringBuffer();
+			int c;
+			do {
+				c = in.read();
+				sb.append((char) c);
+			}
+			while (c != '\n');
+			//FIXME: Redundant
+			if (b == 1) { // error
+				System.out.print(sb.toString());
+			}
+			if (b == 2) { // fatal error
+				System.out.print(sb.toString());
+			}
+			log.warn(sb.toString());
+		}
+		return b;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
index 75a8062..b5ce833 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
@@ -36,6 +36,7 @@ import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.worker.core.cluster.ServerInfo;
 import org.apache.airavata.worker.core.config.ResourceConfig;
 import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.core.context.ProcessContext;
 import org.apache.airavata.worker.core.exceptions.WorkerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +90,17 @@ public class WorkerFactory {
         return resources.get(resourceJobManagerType);
     }
 
+    public static SSHKeyAuthentication getStorageSSHKeyAuthentication(ProcessContext pc)
+            throws WorkerException, CredentialStoreException {
+        try {
+            return getSshKeyAuthentication(pc.getGatewayId(),
+                    pc.getStorageResourceLoginUserName(),
+                    pc.getStorageResourceCredentialToken());
+        }  catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+            throw new WorkerException("Couldn't build ssh authentication object", e);
+        }
+    }
+
     public static SSHKeyAuthentication getSshKeyAuthentication(String gatewayId,
                                                                 String loginUserName,
                                                                 String credentialStoreToken)

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
index a53d736..663428a 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
@@ -8,10 +8,14 @@ import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.status.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.airavata.worker.core.context.ProcessContext;
@@ -20,6 +24,9 @@ import org.apache.airavata.worker.core.exceptions.WorkerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -232,4 +239,61 @@ public class WorkerUtils {
             return null;
         }
     }
+
+    public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+        try {
+            ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+            String experimentId = processContext.getExperimentId();
+            ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+            List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+            if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+                for (OutputDataObjectType expOutput : experimentOutputs){
+                    if (expOutput.getName().equals(outputName)){
+                        DataProductModel dataProductModel = new DataProductModel();
+                        dataProductModel.setGatewayId(processContext.getGatewayId());
+                        dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+                        dataProductModel.setProductName(outputName);
+                        dataProductModel.setDataProductType(DataProductType.FILE);
+
+                        DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+                        replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+                        replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+                        replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+                        replicaLocationModel.setFilePath(outputVal);
+                        dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+                        ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+                        String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+                        expOutput.setValue(productUri);
+                    }
+                }
+            }
+            experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+        } catch (RegistryException e) {
+            String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+                    + " : - Error while updating experiment outputs";
+            throw new WorkerException(msg, e);
+        }
+    }
+
+    public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+        String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+        String filePath;
+        if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+            if(!experimentDataDir.endsWith(File.separator)){
+                experimentDataDir += File.separator;
+            }
+            if (experimentDataDir.startsWith(File.separator)) {
+                filePath = experimentDataDir + fileName;
+            } else {
+                filePath = inputPath + experimentDataDir + fileName;
+            }
+        } else {
+            filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+        }
+        //FIXME
+        return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+    }
 }


[2/3] airavata git commit: Add implementation for BESJobSubmissionTask

Posted by go...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
new file mode 100644
index 0000000..48e6986
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
@@ -0,0 +1,517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import org.apache.commons.httpclient.URIException;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlObject;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.*;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.EnvironmentType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.POSIXApplicationDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.POSIXApplicationType;
+import org.ggf.schemas.jsdl.x2006.x07.jsdlHpcpa.HPCProfileApplicationDocument;
+import org.ggf.schemas.jsdl.x2006.x07.jsdlHpcpa.HPCProfileApplicationType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.SPMDApplicationDocument;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.SPMDApplicationType;
+
+import javax.xml.namespace.QName;
+
+
+/**
+ * 
+ * @author shahbaz memon, bastian demuth
+ *
+ */
+public class JSDLUtils
+{
+
+	public static final int FLAG_OVERWRITE = 1;
+	public static final int FLAG_APPEND = 2;
+	public static final int FLAG_DELETE_ON_TERMINATE = 32;
+
+	public static final QName POSIX_APPLICATION=POSIXApplicationDocument.type.getDocumentElementName();
+	
+	public static final QName HPC_PROFILE_APPLICATION=HPCProfileApplicationDocument.type.getDocumentElementName();
+	
+	public static final QName SPMD_APPLICATION=SPMDApplicationDocument.type.getDocumentElementName();
+	
+	public static final String PROCESSESPERHOST = "ProcessesPerHost"; 
+	public static final String NUMBEROFPROCESSES = "NumberOfProcesses";
+	public static final String THREADSPERHOST = "ThreadsPerHost";
+
+
+	
+	public static EnvironmentType addEnvVariable(JobDefinitionType def,String name, String value) {
+		POSIXApplicationType posixApp = getOrCreatePOSIXApplication(def);
+		EnvironmentType newEnv = posixApp.addNewEnvironment();
+		newEnv.setName(name);
+		newEnv.setStringValue(value);
+		return newEnv;
+	}
+
+	public static void setApplicationName(JobDefinitionType value, String applicationName) {
+		getOrCreateApplication(value).setApplicationName(applicationName);
+	}
+
+	public static void setApplicationVersion(JobDefinitionType value, String applicationVersion) {
+		getOrCreateApplication(value).setApplicationVersion(applicationVersion);
+	}
+
+	public static void addProjectName(JobDefinitionType value, String projectName) {
+		getOrCreateJobIdentification(value).addNewJobProject().setStringValue(projectName);
+	}
+	
+	public static void addMultipleProjectNames(JobDefinitionType value, String[] projectNames) {
+		for (String name : projectNames) {
+			getOrCreateJobIdentification(value).addNewJobProject().setStringValue(name);
+		}
+	}
+
+	public static void addCandidateHost(JobDefinitionType value, String host) {
+		getOrCreateCandidateHosts(value).addHostName(host);
+
+	}
+	public static void addDataStagingTargetElement(JobDefinitionType value, String fileSystem, String file, String uri) {
+		addDataStagingTargetElement(value,fileSystem, file, uri, 1);
+	}
+
+	public static void addDataStagingTargetElement(JobDefinitionType value, String fileSystem, String file, String uri, int flags) {
+		JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+		DataStagingType newDS = jobDescr.addNewDataStaging();
+		CreationFlagEnumeration.Enum creationFlag = CreationFlagEnumeration.DONT_OVERWRITE;
+		if((flags & FLAG_OVERWRITE) != 0) creationFlag = CreationFlagEnumeration.OVERWRITE;
+		if((flags & FLAG_APPEND) != 0) creationFlag = CreationFlagEnumeration.APPEND;
+		boolean deleteOnTerminate = (flags & FLAG_DELETE_ON_TERMINATE) != 0;
+		newDS.setCreationFlag(creationFlag);
+		newDS.setDeleteOnTermination(deleteOnTerminate);
+		SourceTargetType target = newDS.addNewTarget();
+
+		try {
+			if (uri != null) { 
+				URIUtils.encodeAll(uri);
+				target.setURI(uri);
+			}
+		} catch (URIException e) {
+		}
+		newDS.setFileName(file);
+		if (fileSystem != null && !fileSystem.equals("Work")) {  //$NON-NLS-1$
+			newDS.setFilesystemName(fileSystem);
+		}
+	}
+
+	public static void addDataStagingSourceElement(JobDefinitionType value, String uri, String fileSystem, String file) {
+		addDataStagingSourceElement(value, uri, fileSystem, file, 1);
+	}
+
+	public static void addDataStagingSourceElement(JobDefinitionType value, String uri, String fileSystem, String file, int flags) {
+		JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+
+		try {
+			uri = (uri == null) ? null : URIUtils.encodeAll(uri);
+		} catch (URIException e) {
+		}
+		DataStagingType newDS = jobDescr.addNewDataStaging();
+		CreationFlagEnumeration.Enum creationFlag = CreationFlagEnumeration.DONT_OVERWRITE;
+		if((flags & FLAG_OVERWRITE) != 0) creationFlag = CreationFlagEnumeration.OVERWRITE;
+		if((flags & FLAG_APPEND) != 0) creationFlag = CreationFlagEnumeration.APPEND;
+		boolean deleteOnTerminate = (flags & FLAG_DELETE_ON_TERMINATE) != 0;
+		newDS.setCreationFlag(creationFlag);
+		newDS.setDeleteOnTermination(deleteOnTerminate);
+		SourceTargetType source = newDS.addNewSource();
+		source.setURI(uri);
+		newDS.setFileName(file);
+		if (fileSystem != null && !fileSystem.equals("Work")) {  //$NON-NLS-1$
+			newDS.setFilesystemName(fileSystem);
+		}
+	}
+
+
+	public static ApplicationType getOrCreateApplication(JobDefinitionType value) {
+		JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+		if (!jobDescr.isSetApplication()) {
+			jobDescr.addNewApplication();
+		}
+		return jobDescr.getApplication();
+	}
+
+	public static CandidateHostsType getOrCreateCandidateHosts(JobDefinitionType value) {
+		ResourcesType resources = getOrCreateResources(value);
+		if (!resources.isSetCandidateHosts()) {
+			resources.addNewCandidateHosts();
+		}
+		return resources.getCandidateHosts();
+	}
+
+	public static CPUArchitectureType getOrCreateCPUArchitecture(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if (!jobResources.isSetCPUArchitecture()) {
+			jobResources.addNewCPUArchitecture();
+		}
+		return jobResources.getCPUArchitecture();
+	}
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUCount(JobDefinitionType value) {
+		ResourcesType jobResources = getOrCreateResources(value);
+		if (!jobResources.isSetIndividualCPUCount()) {
+			jobResources.addNewIndividualCPUCount();
+		}
+		return jobResources.getIndividualCPUCount();
+	}
+
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUSpeed(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if (!jobResources.isSetIndividualCPUSpeed()) {
+			jobResources.addNewIndividualCPUSpeed();
+		}
+		return jobResources.getIndividualCPUSpeed();
+	}
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUTime(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if ( !jobResources.isSetIndividualCPUTime() ) {
+			jobResources.addNewIndividualCPUTime();
+		}
+		return jobResources.getIndividualCPUTime();
+	}
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualDiskSpace(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if (!jobResources.isSetIndividualDiskSpace()) {
+			jobResources.addNewIndividualDiskSpace();
+		}
+		return jobResources.getIndividualDiskSpace();
+	}
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualPhysicalMemory(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if (!jobResources.isSetIndividualPhysicalMemory()) {
+			jobResources.addNewIndividualPhysicalMemory();
+		}
+		return jobResources.getIndividualPhysicalMemory();
+	}
+
+	public static JobDescriptionType getOrCreateJobDescription(JobDefinitionType value) {
+		if (value.getJobDescription() == null) {
+			return value.addNewJobDescription();
+		}
+		return value.getJobDescription();
+	}
+
+	public static JobIdentificationType getOrCreateJobIdentification(JobDefinitionType value) {
+		JobDescriptionType descr = getOrCreateJobDescription(value);
+		if (descr.getJobIdentification() == null) {
+			return descr.addNewJobIdentification();
+		}
+		return descr.getJobIdentification();
+	}
+
+	public static OperatingSystemType getOrCreateOperatingSystem(JobDefinitionType value)
+	{
+		ResourcesType jobResources = getOrCreateResources(value);
+		if(!jobResources.isSetOperatingSystem()) {
+			jobResources.addNewOperatingSystem();
+		}
+		return jobResources.getOperatingSystem();
+	}
+
+	public static ResourcesType getOrCreateResources(JobDefinitionType value) {
+		JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+		if (!jobDescr.isSetResources()) {
+			jobDescr.addNewResources();
+		}
+		return jobDescr.getResources();
+	}
+
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateTotalCPUCount(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if ( !jobResources.isSetTotalCPUCount() ) {
+			jobResources.addNewTotalCPUCount();
+		}
+		return jobResources.getTotalCPUCount();
+	}
+
+
+	public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateTotalResourceCount(JobDefinitionType value) {
+
+		ResourcesType jobResources = getOrCreateResources(value);
+		if ( !jobResources.isSetTotalResourceCount())
+		{
+			jobResources.addNewTotalResourceCount();
+		}
+		return jobResources.getTotalResourceCount();
+	}
+
+	public static POSIXApplicationType getOrCreatePOSIXApplication(JobDefinitionType value) {
+
+		ApplicationType application = getOrCreateApplication(value);
+
+		if(getHPCProfileApplication(value) != null){
+			//TODO handle: not creating POSIX element if HPCProfile already exists
+			return getPOSIXApplication(value);
+		}
+
+		if (getPOSIXApplication(value) == null) {
+			XmlCursor acursor = application.newCursor();
+			acursor.toEndToken();
+			acursor.insertElement(POSIX_APPLICATION);
+			acursor.dispose();
+		}
+		return getPOSIXApplication(value);
+	}
+
+
+	public static SPMDApplicationType getOrCreateSPMDApplication(JobDefinitionType value) {
+
+		ApplicationType application = getOrCreateApplication(value);
+
+		if (getSPMDApplication(value) == null) {
+			XmlCursor acursor = application.newCursor();
+			acursor.toEndToken();
+			acursor.insertElement(SPMD_APPLICATION);
+			acursor.dispose();
+		}
+		return getSPMDApplication(value);
+	}
+
+	public static SPMDApplicationType getSPMDApplication(JobDefinitionType value) {
+		if (value != null &&
+				value.getJobDescription() != null &&
+				value.getJobDescription().isSetApplication() ) {
+			XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+			if (acursor.toFirstChild()) {
+				do {
+					if(acursor.getName().equals(SPMD_APPLICATION)) {
+						XmlObject result = acursor.getObject();
+						acursor.dispose();
+						return (SPMDApplicationType) result;
+					}
+				} while (acursor.toNextSibling());
+				acursor.dispose();
+				return null;
+			} else {
+				acursor.dispose();
+				return null;
+			}
+		} else {
+			return null;
+		}
+	}
+
+
+
+	public static POSIXApplicationType getPOSIXApplication(JobDefinitionType value) {
+		if (value != null &&
+				value.getJobDescription() != null &&
+				value.getJobDescription().isSetApplication() ) {
+			XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+			if (acursor.toFirstChild()) {
+				do {
+					if(acursor.getName().equals(POSIX_APPLICATION)) {
+						XmlObject result = acursor.getObject();
+						acursor.dispose();
+						return (POSIXApplicationType) result;
+					}
+				} while (acursor.toNextSibling());
+				acursor.dispose();
+				return null;
+			} else {
+				acursor.dispose();
+				return null;
+			}
+		} else {
+			return null;
+		}
+	}
+
+
+
+	public static HPCProfileApplicationType getOrCreateHPCProfileApplication(JobDefinitionType value) {
+
+		ApplicationType application = getOrCreateApplication(value);
+
+		if(getPOSIXApplication(value) != null){
+			//TODO handle: creating HPC element if POSIX already exists
+			return getHPCProfileApplication(value);
+		}
+
+		if (getHPCProfileApplication(value) == null) {
+			XmlCursor acursor = application.newCursor();
+			acursor.toEndToken();
+			acursor.insertElement(HPC_PROFILE_APPLICATION);
+			acursor.dispose();
+		}
+		return getHPCProfileApplication(value);
+	}
+
+
+	public static HPCProfileApplicationType getHPCProfileApplication(JobDefinitionType value) {
+		if (value != null &&
+				value.getJobDescription() != null &&
+				value.getJobDescription().isSetApplication() ) {
+			XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+			if (acursor.toFirstChild()) {
+				do {
+					if(acursor.getName().equals(HPC_PROFILE_APPLICATION)) {
+						XmlObject result = acursor.getObject();
+						acursor.dispose();
+						return (HPCProfileApplicationType) result;
+					}
+				} while (acursor.toNextSibling());
+				acursor.dispose();
+				return null;
+			} else {
+				acursor.dispose();
+				return null;
+			}
+		} else {
+			return null;
+		}
+	}
+
+
+
+
+	public static RangeValueType getTotalCPUCountRequirements(JobDefinitionType value) {
+		if(value != null && value.getJobDescription() != null && value.getJobDescription().isSetResources() &&
+				value.getJobDescription().getResources().isSetTotalCPUCount()){
+			return toU6RangeValue(value.getJobDescription().getResources().getTotalCPUCount());
+		}
+		else
+			return null;
+	}
+
+	public static RangeValueType getTotalResourceCountRequirements(JobDefinitionType value) {
+		if(value != null && value.getJobDescription() != null && value.getJobDescription().isSetResources() &&
+				value.getJobDescription().getResources().isSetTotalResourceCount()){
+			return toU6RangeValue(value.getJobDescription().getResources().getTotalResourceCount());
+		}
+		else
+			return null;
+	}
+
+
+	public static RangeValueType toU6RangeValue(org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType jsdlType) {
+		RangeValueType result = new RangeValueType();
+		if(jsdlType.getExactArray().length > 0){
+			result.setExact(jsdlType.getExactArray(0).getDoubleValue());
+		}
+		if(jsdlType.isSetLowerBoundedRange()){
+			result.setLowerBound(jsdlType.getLowerBoundedRange().getDoubleValue());
+		}
+		if(jsdlType.isSetUpperBoundedRange()){
+			result.setUpperBound(jsdlType.getUpperBoundedRange().getDoubleValue());
+		}
+		return result;
+	}
+
+
+
+	public static void setCPUArchitectureRequirements(JobDefinitionType value, ProcessorRequirement cpuArchitecture) {
+		if(cpuArchitecture == null || cpuArchitecture.getValue() == null) return;
+		CPUArchitectureType cpuArch = getOrCreateCPUArchitecture(value);
+		cpuArch.setCPUArchitectureName(ProcessorArchitectureEnumeration.Enum.forString(cpuArchitecture.getValue()));
+	}
+
+	public static void setIndividualCPUCountRequirements(JobDefinitionType value, RangeValueType cpuCount) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualCPUCount = getOrCreateIndividualCPUCount(value);
+		setRangeValue(cpuCount, individualCPUCount);
+	}
+
+	public static void setIndividualCPUSpeedRequirements(JobDefinitionType value, RangeValueType cpuSpeed) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualCPUSpeed = getOrCreateIndividualCPUSpeed(value);
+		setRangeValue(cpuSpeed, individualCPUSpeed);
+	}
+
+	public static void setIndividualCPUTimeRequirements(JobDefinitionType value, RangeValueType cpuTime) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType cpuIndividualTime = getOrCreateIndividualCPUTime(value);
+		setRangeValue(cpuTime, cpuIndividualTime);
+	}
+
+	public static void setIndividualDiskSpaceRequirements(JobDefinitionType value, RangeValueType diskSpace) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualDiskSpace = getOrCreateIndividualDiskSpace(value);
+		setRangeValue(diskSpace, individualDiskSpace);
+	}
+
+	public static void setIndividualPhysicalMemoryRequirements(JobDefinitionType value, RangeValueType physicalMemory) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualPhysicalMemory = getOrCreateIndividualPhysicalMemory(value);
+		setRangeValue(physicalMemory, individualPhysicalMemory);
+	}
+
+
+	public static void setName(JobDefinitionType value, String name) {
+		getOrCreateJobIdentification(value).setJobName(name);
+	}
+
+
+	public static void setRangeValue(RangeValueType u6Type, org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType jsdlType) {
+		Double exact = u6Type.getExact();
+		Double epsilon = u6Type.getEpsilon();
+		Double lower = u6Type.getLowerBound();
+		Double upper = u6Type.getUpperBound();
+
+
+		if(lower.isNaN() && upper.isNaN())
+		{
+			ExactType exactType = jsdlType.getExactArray().length > 0 ? jsdlType.getExactArray(0) : jsdlType.addNewExact();
+			exactType.setDoubleValue(exact);
+			if(!epsilon.isNaN() && epsilon != 0)
+			{
+				exactType.setEpsilon(epsilon);
+			}
+		}
+		else
+		{
+			if(!lower.isNaN())
+			{
+				BoundaryType lowerBound = jsdlType.isSetLowerBoundedRange() ? jsdlType.getLowerBoundedRange() : jsdlType.addNewLowerBoundedRange();
+				lowerBound.setDoubleValue(lower);
+				lowerBound.setExclusiveBound(!u6Type.isIncludeLowerBound());
+			}
+
+			if(!upper.isNaN())
+			{
+				BoundaryType upperBound = jsdlType.isSetUpperBoundedRange() ? jsdlType.getUpperBoundedRange() : jsdlType.addNewUpperBoundedRange();
+				upperBound.setDoubleValue(upper);
+				upperBound.setExclusiveBound(!u6Type.isIncludeUpperBound());
+			}
+		}
+	}
+
+	public static void setTotalCPUCountRequirements(JobDefinitionType value, RangeValueType cpuCount) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType cpuTotalCount = getOrCreateTotalCPUCount(value);
+		setRangeValue(cpuCount, cpuTotalCount);
+	}
+
+	public static void setTotalResourceCountRequirements(JobDefinitionType value, RangeValueType resourceCount) {
+		org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType totalCount = getOrCreateTotalResourceCount(value);
+		setRangeValue(resourceCount, totalCount);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
new file mode 100644
index 0000000..3694eea
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+/**
+ * file creation modes 
+ */
+public enum Mode {
+
+	/**
+	 * overwrite any existing file
+	 */
+	overwrite,
+	
+	/**
+	 * append to an existing file
+	 */
+	append,
+	
+	/**
+	 * do NOT overwrite and fail if the file exists
+	 */
+	nooverwrite
+	
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
new file mode 100644
index 0000000..0794caf
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
@@ -0,0 +1,465 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import eu.emi.security.authn.x509.CommonX509TrustManager;
+import eu.emi.security.authn.x509.X509CertChainValidator;
+import org.bouncycastle.asn1.oiw.OIWObjectIdentifiers;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.crypto.util.PublicKeyFactory;
+import org.bouncycastle.crypto.util.SubjectPublicKeyInfoFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+import org.bouncycastle.pkcs.PKCS10CertificationRequestBuilder;
+import org.bouncycastle.util.encoders.Base64;
+
+import javax.net.ssl.*;
+import javax.security.auth.login.FailedLoginException;
+import java.io.*;
+import java.net.ProtocolException;
+import java.security.*;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * The MyProxyLogon class provides an interface for retrieving credentials from
+ * a MyProxy server.
+ * <p/>
+ * First, use <code>setHost</code>, <code>setPort</code>,
+ * <code>setUsername</code>, <code>setPassphrase</code>,
+ * <code>setCredentialName</code>, <code>setLifetime</code> and
+ * <code>requestTrustRoots</code> to configure. Then call <code>connect</code>,
+ * <code>logon</code>, <code>getCredentials</code>, then
+ * <code>disconnect</code>. Use <code>getCertificates</code> and
+ * <code>getPrivateKey</code> to access the retrieved credentials, or
+ * <code>writeProxyFile</code> or <code>saveCredentialsToFile</code> to
+ * write them to a file. Use <code>writeTrustRoots</code>,
+ * <code>getTrustedCAs</code>, <code>getCRLs</code>,
+ * <code>getTrustRootData</code>, and <code>getTrustRootFilenames</code>
+ * for trust root information.
+ *
+ * (modified for use with UNICORE)
+ *
+ * @version 1.1
+ * @see <a href="http://myproxy.ncsa.uiuc.edu/">MyProxy Project Home Page</a>
+ * 
+ */
+public class MyProxyLogon {
+    
+	public final static String version = "1.1";
+
+    private enum State {
+        READY, CONNECTED, LOGGEDON, DONE
+    }
+
+    public final static String VERSION = "VERSION=MYPROXYv2";
+    private final static String GETCOMMAND = "COMMAND=0";
+    private final static String TRUSTROOTS = "TRUSTED_CERTS=";
+    private final static String USERNAME = "USERNAME=";
+    private final static String PASSPHRASE = "PASSPHRASE=";
+    private final static String LIFETIME = "LIFETIME=";
+    private final static String CREDNAME = "CRED_NAME=";
+    public final static String RESPONSE = "RESPONSE=";
+    private final static String ERROR = "ERROR=";
+    private final static String DN = "CN=ignore";
+
+    public final int DEFAULT_KEY_SIZE = 2048;
+    private int keySize = DEFAULT_KEY_SIZE;
+    private final static String keyAlg = "RSA";
+    private State state = State.READY;
+    private String host = "localhost";
+    private String username;
+    private String credname;
+    private char[] passphrase;
+    private int port = 7512;
+    private int lifetime = 43200;
+    private SSLSocket socket;
+    private BufferedInputStream socketIn;
+    private BufferedOutputStream socketOut;
+    private KeyPair keypair;
+    private Collection<X509Certificate> certificateChain;
+    private String[] trustrootFilenames;
+    private String[] trustrootData;
+    private KeyManagerFactory keyManagerFactory;
+    private TrustManager trustManager;
+    
+    /**
+     * Constructs a MyProxyLogon object.
+     */
+    public MyProxyLogon() {
+        super();
+        host = System.getenv("MYPROXY_SERVER");
+        if (host == null) {
+            host = "myproxy.teragrid.org";
+        }
+        String portString = System.getenv("MYPROXY_SERVER_PORT");
+        if (portString != null) {
+            port = Integer.parseInt(portString);
+        }
+        username = System.getProperty("user.name");
+    }
+
+    
+    /**
+     * sets the internal trust manager using the supplied validator
+     */
+    public void setValidator(X509CertChainValidator validator){
+    	 CommonX509TrustManager mtm = new CommonX509TrustManager(validator);
+    	 setTrustManager(mtm);
+    }
+
+    /**
+     * Sets the hostname of the MyProxy server. Defaults to localhost.
+     *
+     * @param host MyProxy server hostname
+     */
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    /**
+     * Sets the port of the MyProxy server. Defaults to 7512.
+     *
+     * @param port MyProxy server port
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets the key size.
+     *
+     * @param keySize
+     */
+    public void setKeySize(int keySize) {
+        this.keySize = keySize;
+    }
+
+    /**
+     * Gets the MyProxy username.
+     *
+     * @return MyProxy server port
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * Sets the MyProxy username. Defaults to user.name.
+     *
+     * @param username MyProxy username
+     */
+    public void setUsername(String username) {
+    	this.username = username;
+    }
+
+    /**
+     * Sets the optional MyProxy credential name.
+     *
+     * @param credname credential name
+     */
+    public void setCredentialName(String credname) {
+       this.credname = credname;
+    }
+
+    /**
+     * Sets the MyProxy passphrase.
+     *
+     * @param passphrase MyProxy passphrase
+     */
+    public void setPassphrase(char[] passphrase) {
+       this.passphrase = passphrase;
+    }
+
+    /**
+     * Sets the requested credential lifetime. Defaults to 43200 seconds (12
+     * hours).
+     *
+     * @param seconds Credential lifetime
+     */
+    public void setLifetime(int seconds) {
+        lifetime = seconds;
+    }
+
+    /**
+     * Gets the certificates returned from the MyProxy server by
+     * getCredentials().
+     *
+     * @return Collection of java.security.cert.Certificate objects
+     */
+    public Collection<X509Certificate> getCertificates() {
+        return certificateChain;
+    }
+
+    
+    // for unit testing
+    static PrivateKey testingPrivateKey;
+    
+    /**
+     * Gets the private key generated by getCredentials().
+     *
+     * @return PrivateKey
+     */
+    public PrivateKey getPrivateKey() {
+    	if(testingPrivateKey!=null){
+			//for unit testing
+			return testingPrivateKey;	
+		}
+		return keypair.getPrivate();
+    }
+
+    /**
+     * Connects to the MyProxy server at the desired host and port. Requires
+     * host authentication via SSL. The host's certificate subject must
+     * match the requested hostname. If CA certificates are found in the
+     * standard GSI locations, they will be used to verify the server's
+     * certificate. If trust roots are requested and no CA certificates are
+     * found, the server's certificate will still be accepted.
+     */
+    public void connect() throws IOException, GeneralSecurityException {
+        SSLContext sc = SSLContext.getInstance("SSL");
+        if(trustManager==null){
+        	throw new IllegalStateException("No trust manager has been set!");
+        }
+        TrustManager[] trustAllCerts = new TrustManager[]{trustManager};
+        sc.init(getKeyManagers(), trustAllCerts, new SecureRandom());
+        SSLSocketFactory sf = sc.getSocketFactory();
+        socket = (SSLSocket) sf.createSocket(host, port);
+        socket.startHandshake();
+        socketIn = new BufferedInputStream(socket.getInputStream());
+        socketOut = new BufferedOutputStream(socket.getOutputStream());
+        state = State.CONNECTED;
+    }
+
+    /**
+     * Set the key manager factory for use in client-side SSLSocket
+     * certificate-based authentication to the MyProxy server.
+     * Call this before connect().
+     *
+     * @param keyManagerFactory Key manager factory to use
+     */
+    public void setKeyManagerFactory(KeyManagerFactory keyManagerFactory) {
+        this.keyManagerFactory = keyManagerFactory;
+    }
+
+
+	public void setTrustManager(TrustManager trustManager) {
+		this.trustManager = trustManager;
+	}
+
+	/**
+     * Disconnects from the MyProxy server.
+     */
+    public void disconnect() throws IOException {
+        socket.close();
+        socket = null;
+        socketIn = null;
+        socketOut = null;
+        state = State.READY;
+    }
+
+    /**
+     * Logs on to the MyProxy server by issuing the MyProxy GET command.
+     */
+    public void logon() throws IOException, GeneralSecurityException {
+        String line;
+        char response;
+
+        if (state != State.CONNECTED) {
+            connect();
+        }
+
+        socketOut.write('0');
+        socketOut.flush();
+        socketOut.write(VERSION.getBytes());
+        socketOut.write('\n');
+        socketOut.write(GETCOMMAND.getBytes());
+        socketOut.write('\n');
+        socketOut.write(USERNAME.getBytes());
+        socketOut.write(username.getBytes());
+        socketOut.write('\n');
+        socketOut.write(PASSPHRASE.getBytes());
+        socketOut.write(new String(passphrase).getBytes());
+        socketOut.write('\n');
+        socketOut.write(LIFETIME.getBytes());
+        socketOut.write(Integer.toString(lifetime).getBytes());
+        socketOut.write('\n');
+        if (credname != null) {
+            socketOut.write(CREDNAME.getBytes());
+            socketOut.write(credname.getBytes());
+            socketOut.write('\n');
+        }
+        socketOut.flush();
+
+        line = readLine(socketIn);
+        if (line == null) {
+            throw new EOFException();
+        }
+        if (!line.equals(VERSION)) {
+            throw new ProtocolException("bad MyProxy protocol VERSION string: "
+                    + line);
+        }
+        line = readLine(socketIn);
+        if (line == null) {
+            throw new EOFException();
+        }
+        if (!line.startsWith(RESPONSE)
+                || line.length() != RESPONSE.length() + 1) {
+            throw new ProtocolException(
+                    "bad MyProxy protocol RESPONSE string: " + line);
+        }
+        response = line.charAt(RESPONSE.length());
+        if (response == '1') {
+            StringBuffer errString;
+
+            errString = new StringBuffer("MyProxy logon failed");
+            while ((line = readLine(socketIn)) != null) {
+                if (line.startsWith(ERROR)) {
+                    errString.append('\n');
+                    errString.append(line.substring(ERROR.length()));
+                }
+            }
+            throw new FailedLoginException(errString.toString());
+        } else if (response == '2') {
+            throw new ProtocolException(
+                    "MyProxy authorization RESPONSE not implemented");
+        } else if (response != '0') {
+            throw new ProtocolException(
+                    "unknown MyProxy protocol RESPONSE string: " + line);
+        }
+        while ((line = readLine(socketIn)) != null) {
+            if (line.startsWith(TRUSTROOTS)) {
+                String filenameList = line.substring(TRUSTROOTS.length());
+                trustrootFilenames = filenameList.split(",");
+                trustrootData = new String[trustrootFilenames.length];
+                for (int i = 0; i < trustrootFilenames.length; i++) {
+                    String lineStart = "FILEDATA_" + trustrootFilenames[i]
+                            + "=";
+                    line = readLine(socketIn);
+                    if (line == null) {
+                        throw new EOFException();
+                    }
+                    if (!line.startsWith(lineStart)) {
+                        throw new ProtocolException(
+                                "bad MyProxy protocol RESPONSE: expecting "
+                                        + lineStart + " but received " + line);
+                    }
+                    trustrootData[i] = new String(Base64.decode(line
+                            .substring(lineStart.length())));
+                }
+            }
+        }
+        state = State.LOGGEDON;
+    }
+
+   
+    /**
+     * Retrieves credentials from the MyProxy server.
+     */
+    public void getCredentials() throws IOException, GeneralSecurityException {
+
+        KeyPairGenerator keyGenerator = KeyPairGenerator.getInstance(keyAlg);
+        keyGenerator.initialize(keySize);
+        keypair = keyGenerator.genKeyPair();
+        Security.addProvider(new BouncyCastleProvider());
+        
+        org.bouncycastle.pkcs.PKCS10CertificationRequest pkcs10 = null;
+        try{
+        	pkcs10 = generateCertificationRequest(DN, keypair);
+        }
+        catch(Exception ex){
+        	throw new GeneralSecurityException(ex);
+        }
+        getCredentials(pkcs10.getEncoded());
+    }
+
+    
+    public X509Certificate getCertificate() {
+        if (certificateChain == null) {
+           return null;
+        }
+        Iterator<X509Certificate> iter = this.certificateChain.iterator();
+        return iter.next();
+    }
+    
+    
+    private KeyManager[] getKeyManagers() {
+        return keyManagerFactory != null? keyManagerFactory.getKeyManagers() : null ;
+    }
+
+    private void getCredentials(byte[] derEncodedCertRequest) throws IOException, GeneralSecurityException {
+        if (state != State.LOGGEDON) {
+            logon();
+        }
+        socketOut.write(derEncodedCertRequest);
+        socketOut.flush();
+        int numCertificates = socketIn.read();
+        if (numCertificates == -1) {
+            throw new IOException("Error: connection aborted");
+        } else if (numCertificates == 0 || numCertificates < 0) {
+            throw new GeneralSecurityException("Error: bad number of certificates sent by server");
+        }
+        CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
+        certificateChain = new ArrayList<X509Certificate>();
+        for(int i = 0; i<numCertificates; i++){
+        	X509Certificate c = (X509Certificate)certFactory.generateCertificate(socketIn);
+        	certificateChain.add(c);
+        }
+        state = State.DONE;
+    }
+
+    private String readLine(InputStream is) throws IOException {
+        StringBuffer sb = new StringBuffer();
+        for (int c = is.read(); c > 0 && c != '\n'; c = is.read()) {
+            sb.append((char) c);
+        }
+        if (sb.length() > 0) {
+            return new String(sb);
+        }
+        return null;
+    }
+
+	private org.bouncycastle.pkcs.PKCS10CertificationRequest generateCertificationRequest(String dn, KeyPair kp)
+			throws Exception{
+		X500Name subject=new X500Name(dn);
+		PublicKey pubKey=kp.getPublic();
+		PrivateKey privKey=kp.getPrivate();
+		AsymmetricKeyParameter pubkeyParam = PublicKeyFactory.createKey(pubKey.getEncoded());
+		SubjectPublicKeyInfo publicKeyInfo=SubjectPublicKeyInfoFactory.createSubjectPublicKeyInfo(pubkeyParam);
+		PKCS10CertificationRequestBuilder builder=new PKCS10CertificationRequestBuilder(subject, publicKeyInfo);
+		AlgorithmIdentifier signatureAi = new AlgorithmIdentifier(OIWObjectIdentifiers.sha1WithRSA);
+		BcRSAContentSignerBuilder signerBuilder=new BcRSAContentSignerBuilder(
+				signatureAi, AlgorithmIdentifier.getInstance(OIWObjectIdentifiers.idSHA1));
+		AsymmetricKeyParameter pkParam = PrivateKeyFactory.createKey(privKey.getEncoded());
+		ContentSigner signer=signerBuilder.build(pkParam);
+		return builder.build(signer);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
new file mode 100644
index 0000000..54481df
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum OSType {
+
+	unknown("Unknown"), //$NON-NLS-1$
+	linux("LINUX"), //$NON-NLS-1$
+	mac_os("MACOS"), //$NON-NLS-1$
+	win95("WIN95"), //$NON-NLS-1$
+	win98("WIN98"), //$NON-NLS-1$
+	windows_R_Me("Windows_R_Me"), //$NON-NLS-1$
+	winNT("WINNT"), //$NON-NLS-1$
+	windows_2000("Windows_2000"), //$NON-NLS-1$
+	windows_XP("Windows_XP"), //$NON-NLS-1$
+	msdos("MSDOS"), //$NON-NLS-1$
+	solaris("Solaris"), //$NON-NLS-1$
+	sunOS("SunOS"), //$NON-NLS-1$
+	freeBSD("FreeBSD"), //$NON-NLS-1$
+	netBSD("NetBSD"), //$NON-NLS-1$
+	openBSD("OpenBSD"), //$NON-NLS-1$
+	bsdunix("BSDUNIX"), //$NON-NLS-1$
+	aix("AIX"), //$NON-NLS-1$
+	z_OS("z_OS"), //$NON-NLS-1$
+	os_2("OS_2"), //$NON-NLS-1$
+	os9("OS9"), //$NON-NLS-1$
+	netWare("NetWare"), //$NON-NLS-1$
+	tru64_unix("Tru64_UNIX"), //$NON-NLS-1$
+	irix("IRIX"), //$NON-NLS-1$
+	osf("OSF"), //$NON-NLS-1$
+
+	mvs("MVS"), //$NON-NLS-1$
+	os400("OS400"), //$NON-NLS-1$
+	javaVM("JavaVM"), //$NON-NLS-1$
+	win3x("WIN3x"), //$NON-NLS-1$
+	winCE("WINCE"), //$NON-NLS-1$
+	NCR3000("NCR3000"), //$NON-NLS-1$
+	dc_os("DC_OS"), //$NON-NLS-1$
+	reliant_unix("Reliant_UNIX"), //$NON-NLS-1$
+	sco_unixWare("SCO_UnixWare"), //$NON-NLS-1$
+	sco_openServer("SCO_OpenServer"), //$NON-NLS-1$
+	sequent("Sequent"), //$NON-NLS-1$
+	u6000("U6000"), //$NON-NLS-1$
+	aseries("ASERIES"), //$NON-NLS-1$
+	tandemNSK("TandemNSK"), //$NON-NLS-1$
+	tandemNT("TandemNT"), //$NON-NLS-1$
+	bs2000("BS2000"), //$NON-NLS-1$
+	lynx("Lynx"), //$NON-NLS-1$
+	xenix("XENIX"), //$NON-NLS-1$
+	vm("VM"), //$NON-NLS-1$
+	interactive_unix("Interactive_UNIX"), //$NON-NLS-1$
+	gnu_hurd("GNU_Hurd"), //$NON-NLS-1$
+	mach_kernel("MACH_Kernel"), //$NON-NLS-1$
+	inferno("Inferno"), //$NON-NLS-1$
+	qnx("QNX"), //$NON-NLS-1$
+	epoc("EPOC"), //$NON-NLS-1$
+	ixWorks("IxWorks"), //$NON-NLS-1$
+	vxWorks("VxWorks"), //$NON-NLS-1$
+	mint("MiNT"), //$NON-NLS-1$
+	beOS("BeOS"), //$NON-NLS-1$
+	hp_mpe("HP_MPE"), //$NON-NLS-1$
+	nextStep("NextStep"), //$NON-NLS-1$
+	palmPilot("PalmPilot"), //$NON-NLS-1$
+	rhapsody("Rhapsody"), //$NON-NLS-1$
+	dedicated("Dedicated"), //$NON-NLS-1$
+	os_390("OS_390"), //$NON-NLS-1$
+	vse("VSE"), //$NON-NLS-1$
+	tpf("TPF"), //$NON-NLS-1$
+	caldera_open_unix("Caldera_Open_UNIX"), //$NON-NLS-1$
+	attunix("ATTUNIX"), //$NON-NLS-1$
+	dgux("DGUX"), //$NON-NLS-1$
+	decnt("DECNT"), //$NON-NLS-1$
+	openVMS("OpenVMS"), //$NON-NLS-1$
+	hpux("HPUX"), //$NON-NLS-1$
+	other("other"); //$NON-NLS-1$
+
+
+	private OSType(String value) { 
+		this.value = value;
+	}
+
+	private final String value;
+
+	public String getValue() { 
+		return value;
+	}
+
+	public static OSType fromString(String value)
+	{
+		for(OSType type : values())
+		{
+			if(type.value.equals(value))
+			{
+				return type;
+			}
+		}
+		return null;
+	}
+	
+	public String toString()
+	{
+		return value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
new file mode 100644
index 0000000..1a26c57
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum ProcessorRequirement{
+	sparc("sparc"), //$NON-NLS-1$
+	powerpc("powerpc"), //$NON-NLS-1$
+	x86("x86"), //$NON-NLS-1$
+	x86_32("x86_32"), //$NON-NLS-1$
+	x86_64("x86_64"), //$NON-NLS-1$
+	parisc("parisc"), //$NON-NLS-1$
+	mips("mips"), //$NON-NLS-1$
+	ia64("ia64"), //$NON-NLS-1$
+	arm("arm"), //$NON-NLS-1$
+	other("other"); //$NON-NLS-1$
+
+	ProcessorRequirement(String value) {
+		this.value = value; 
+	}
+
+	private final String value;
+
+	public String getValue() { 
+		return value; 
+	}
+
+	public static ProcessorRequirement fromString(String value)
+	{
+		for (ProcessorRequirement type : values()) {
+			if (type.value.equals(value)) {
+				return type;
+			}
+		}
+		return other;
+	}
+	
+	public String toString()
+	{
+		return value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
new file mode 100644
index 0000000..a18b85a
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+public class RangeValueType implements ResourceRequirement {
+	
+
+    private double exact = Double.NaN;
+    private double lowerBound = Double.NEGATIVE_INFINITY;
+    private double upperBound = Double.POSITIVE_INFINITY;
+    
+    private double epsilon = Double.NaN;
+    private boolean includeLowerBound = true;
+    private boolean includeUpperBound = true;
+    
+    private boolean enabled = false;
+    
+    
+    public RangeValueType(double exact, double epsilon, double lowerBound, boolean includeLowerBound, double upperBound, boolean includeUpperBound, boolean enabled) {
+        this.exact = exact;
+    	this.epsilon = epsilon;
+    	this.lowerBound = lowerBound;
+        this.includeLowerBound = includeLowerBound;
+        this.upperBound = upperBound;
+        this.includeUpperBound = includeUpperBound;
+        this.enabled = enabled;
+    }
+   
+    
+    
+    /**
+	 * Create the range requirements
+	 * 
+	 * @param exact -
+	 *            the exact value
+	 * @param lowerBound -
+	 *            the lower bound
+	 * @param upperBound -
+	 *            the upper bound
+	 * @param includeUpperBound -
+	 *            true, if upperBound should be included in range
+	 * 
+	 */
+    public RangeValueType(double exact, double epsilon, double lowerBound, boolean includeLowerBound, double upperBound, boolean includeUpperBound) {
+    	this(exact,epsilon,lowerBound,includeLowerBound,upperBound,includeUpperBound,false);
+        
+    }
+    
+    
+    /**
+	 * Create the range requirements
+	 * 
+	 * @param exact -
+	 *            the exact value
+	 * @param lowerBound -
+	 *            the lower bound
+	 * @param upperBound -
+	 *            the upper bound
+	 */
+    public RangeValueType(double exact, double epsilon, double lowerBound, double upperBound) {
+    	this(exact,epsilon,lowerBound,true,upperBound,true);
+    }
+    
+    
+    public RangeValueType(double exact, double lowerBound, double upperBound) {
+    	this(exact,Double.NaN,lowerBound,true,upperBound,true);
+    }
+
+    /**
+	 * Create the exact requirements
+	 * 
+	 * @param exact -
+	 *            the exact value
+	 * @param epsilon -
+	 *            the epsilon arround exact
+	 * 
+	 */
+    public RangeValueType(double exact, double epsilon) {
+        this(exact,epsilon,Double.NaN,Double.NaN);
+    }
+
+    
+    /**
+	 * Create the exact requirements
+	 * 
+	 * @param exact -
+	 *            the exact value
+	 */
+    public RangeValueType(double exact) {
+        this(exact,Double.NaN);
+    }
+
+    public RangeValueType() {
+    }
+
+    /**
+	 * Get exact requirements
+	 * 
+	 * @return the exact requirements
+	 */
+    public double getExact() {
+        return exact;
+    }
+
+    /**
+	 * Set exact requirements
+	 * 
+	 * @param exact -
+	 *            the exact requirements
+	 */
+    public void setExact(double exact) {
+        this.exact = exact;
+    }
+    
+    /**
+	 * Get epsilon
+	 * 
+	 * @return the epsilon
+	 */
+    public double getEpsilon() {
+        return epsilon;
+    }
+
+    /**
+	 * Set epsilon
+	 * 
+	 * @param epsilon -
+	 *            epsilon belonging to to exact requirements
+	 */
+    public void setEpsilon(double epsilon) {
+        this.epsilon = epsilon;
+    }
+
+    /**
+	 * Get lower bound
+	 * 
+	 * @return the lower bound
+	 */
+    public double getLowerBound() {
+        return lowerBound;
+    }
+
+    /**
+	 * Set lower bound
+	 * 
+	 * @param lowerBound -
+	 *            the lower bound
+	 */
+    public void setLowerBound(double lowerBound) {
+        this.lowerBound = lowerBound;
+    }
+
+    /**
+	 * Get upper bound
+	 * 
+	 * @return the upper bound
+	 */
+    public double getUpperBound() {
+        return upperBound;
+    }
+
+    /**
+	 * Set upper bound
+	 * 
+	 * @param upperBound -
+	 *            the upper bound
+	 */
+    public void setUpperBound(double upperBound) {
+        this.upperBound = upperBound;
+    }
+
+    /**
+	 * Test if requirements are met
+	 * 
+	 * @param value -
+	 *            the tested value
+	 * @return <code>true</code> if value is in the range and not less than
+	 *         the exact value
+	 */
+    public boolean lowerThanDouble(double value) {
+        return (value >= exact && value >= lowerBound && value <= upperBound) ? true : false;
+    }
+
+    public String toString() {
+        if (lowerBound == Double.NEGATIVE_INFINITY && upperBound == Double.POSITIVE_INFINITY) {
+            return Double.toString(exact);
+        }
+        else {
+            return "(e=" + Double.toString(exact) + ",l=" + Double.toString(lowerBound) + ",u=" //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+                    + Double.toString(upperBound) + ")"; //$NON-NLS-1$
+        }
+    }
+
+
+	public boolean isIncludeLowerBound() {
+		return includeLowerBound;
+	}
+
+
+	public void setIncludeLowerBound(boolean includeLowerBound) {
+		this.includeLowerBound = includeLowerBound;
+	}
+
+
+	public boolean isIncludeUpperBound() {
+		return includeUpperBound;
+	}
+
+
+	public void setIncludeUpperBound(boolean includeUpperBound) {
+		this.includeUpperBound = includeUpperBound;
+	}
+	
+	public RangeValueType clone(){
+		return new RangeValueType(this.exact, this.epsilon, this.lowerBound, this.includeLowerBound, this.upperBound, this.includeUpperBound,this.enabled);
+	}
+
+
+
+	public boolean isEnabled() {
+		return enabled;
+	}
+
+
+
+	public void setEnabled(boolean enabled) {
+		this.enabled = enabled;
+	}
+
+	
+	public boolean equals(Object o)
+	{
+		if(! (o instanceof RangeValueType)) return false;
+		RangeValueType other = (RangeValueType) o;
+		return doublesEqual(getExact(),other.getExact())
+		&& doublesEqual(getEpsilon(), other.getEpsilon())
+		&& doublesEqual(getLowerBound(), other.getLowerBound())
+		&& doublesEqual(getUpperBound(), other.getUpperBound())
+		&& isIncludeLowerBound() == other.isIncludeLowerBound()
+		&& isIncludeUpperBound() == other.isIncludeUpperBound()
+		&& isEnabled() == other.isEnabled();
+	}
+
+	
+	private boolean doublesEqual(double a, double b)
+	{
+		Double A = new Double(a);
+		Double B = new Double(b);
+		return A.equals(B);
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
new file mode 100644
index 0000000..8723d85
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.unicore.jsdl.extensions.ResourceRequestDocument;
+import eu.unicore.jsdl.extensions.ResourceRequestType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ResourcesType;
+
+public class ResourceProcessor {
+
+	
+	public static void generateResourceElements(JobDefinitionType value, ProcessContext context) throws Exception {
+        ProcessModel processModel = context.getProcessModel();
+        if (processModel != null) {
+            try {
+                ComputationalResourceSchedulingModel crs = processModel.getProcessResourceSchedule();
+
+                if (crs.getTotalPhysicalMemory() > 0) {
+                    RangeValueType rangeType = new RangeValueType();
+                    rangeType.setLowerBound(Double.NaN);
+                    rangeType.setUpperBound(Double.NaN);
+                    rangeType.setExact(crs.getTotalPhysicalMemory());
+                    JSDLUtils.setIndividualPhysicalMemoryRequirements(value, rangeType);
+                }
+
+                if (crs.getNodeCount() > 0) {
+                    RangeValueType rangeType = new RangeValueType();
+                    rangeType.setLowerBound(Double.NaN);
+                    rangeType.setUpperBound(Double.NaN);
+                    rangeType.setExact(crs.getNodeCount());
+                    JSDLUtils.setTotalResourceCountRequirements(value, rangeType);
+                }
+
+                if (crs.getWallTimeLimit() > 0) {
+                    RangeValueType cpuTime = new RangeValueType();
+                    cpuTime.setLowerBound(Double.NaN);
+                    cpuTime.setUpperBound(Double.NaN);
+                    long wallTime = crs.getWallTimeLimit() * 60;
+                    cpuTime.setExact(wallTime);
+                    JSDLUtils.setIndividualCPUTimeRequirements(value, cpuTime);
+                }
+                // the total cpu count is total cpus per node
+                if (crs.getTotalCPUCount() > 0) {
+                    RangeValueType rangeType = new RangeValueType();
+                    rangeType.setLowerBound(Double.NaN);
+                    rangeType.setUpperBound(Double.NaN);
+                    int nodeCount = crs.getNodeCount();
+                    if (nodeCount <= 0) {
+                        nodeCount = 1;
+                    }
+                    rangeType.setExact(crs.getTotalCPUCount() / nodeCount);
+                    JSDLUtils.setIndividualCPUCountRequirements(value, rangeType);
+                }
+
+                String qName = crs.getQueueName();
+                if (!(qName == null || "".equals(qName))) {
+                    // ignore "default" queue names
+                    if (!(crs.getQueueName().trim().equalsIgnoreCase("default"))) {
+                        ResourceRequestDocument rqDoc = ResourceRequestDocument.Factory.newInstance();
+                        ResourceRequestType rq = rqDoc.addNewResourceRequest();
+                        rq.setName("Queue");
+                        rq.setValue(qName);
+                        ResourcesType res = JSDLUtils.getOrCreateResources(value);
+                        WSUtilities.insertAny(rqDoc, res);
+                    }
+                }
+
+            } catch (NullPointerException npe) {
+                throw new Exception("No value set for resource requirements.", npe);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
new file mode 100644
index 0000000..d5708f3
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public interface ResourceRequirement extends Cloneable {
+	
+	/**
+	 * States whether this resource requirement is active
+	 * and should be written into the job description.
+	 * @return
+	 */
+	public boolean isEnabled();
+	
+	public void setEnabled(boolean enabled);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
new file mode 100644
index 0000000..46414eb
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum SPMDVariations {
+
+	MPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPI"), 
+	GridMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/GridMPI"),
+	IntelMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/IntelMPI"),
+	LAMMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/LAM-MPI"), 
+	MPICH1 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH1"),
+	MPICH2 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH2"),
+	MPICHGM ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH-GM"),
+	MPICHMX ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH-MX"),
+	MVAPICH ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MVAPICH"),
+	MVAPICH2 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MVAPICH2"),
+	OpenMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/OpenMPI"),
+	POE ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/POE"),
+	PVM ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/PVM");
+	
+	private final String variation;
+	
+	private SPMDVariations(String variation) {
+		this.variation = variation;
+	}
+	
+	public String value(){
+		return variation;
+	}
+	
+}
+
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
new file mode 100644
index 0000000..00fa472
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
@@ -0,0 +1,160 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.Date;
+import java.util.Random;
+
+public class SecurityUtils {
+	
+	private final static Logger logger = LoggerFactory.getLogger(SecurityUtils.class);
+	
+	
+	public static UNICORESecurityContext getSecurityContext(ProcessContext processContext) throws WorkerException {
+
+		if (processContext.getJobSubmissionProtocol().equals(JobSubmissionProtocol.UNICORE)) {
+			String credentialStoreToken = processContext.getTokenId(); // set by the framework
+			RequestData requestData;
+			requestData = new RequestData(processContext.getProcessModel().getUserDn());
+			requestData.setTokenId(credentialStoreToken);
+			CredentialReader credentialReader = null;
+			try {
+				credentialReader = WorkerUtils.getCredentialReader();
+				if (credentialReader == null) {
+					throw new WorkerException("Credential reader returns null");
+				}
+			} catch (Exception e) {
+				throw new WorkerException("Error while initializing credential reader");
+			}
+			return new UNICORESecurityContext(credentialReader, requestData);
+		} else {
+			throw new WorkerException("Only support UNICORE job submissions, invalid job submission protocol " +
+					processContext.getJobSubmissionProtocol().name());
+		}
+	}
+	
+	public static final KeyAndCertCredential generateShortLivedCertificate(String userDN,
+			String caCertPath, String caKeyPath, String caPwd) throws Exception {
+		final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+		// ago
+
+		final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+		final long endTime = startTime + 30 * 3600 * 1000;
+
+		final String keyLengthProp = "1024";
+		int keyLength = Integer.parseInt(keyLengthProp);
+		final String signatureAlgorithm = "SHA1withRSA";
+
+		KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath,
+				caPwd);
+
+		KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey()
+				.getAlgorithm());
+		kpg.initialize(keyLength);
+		KeyPair pair = kpg.generateKeyPair();
+
+		X500Principal subjectDN = new X500Principal(userDN);
+		Random rand = new Random();
+
+		SubjectPublicKeyInfo publicKeyInfo;
+		try {
+			publicKeyInfo = SubjectPublicKeyInfo
+					.getInstance(new ASN1InputStream(pair.getPublic()
+							.getEncoded()).readObject());
+		} catch (IOException e) {
+			throw new InvalidKeyException("Can not parse the public key"
+					+ "being included in the short lived certificate", e);
+		}
+
+		X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred
+				.getCertificate().getSubjectX500Principal());
+
+		X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+		X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(
+				issuerX500Name, new BigInteger(20, rand), new Date(startTime),
+				new Date(endTime), subjectX500Name, publicKeyInfo);
+
+		AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder
+				.extractAlgorithmId(caCred.getCertificate());
+
+		X509Certificate certificate = certBuilder.build(caCred.getKey(),
+				sigAlgId, signatureAlgorithm, null, null);
+
+		certificate.checkValidity(new Date());
+		certificate.verify(caCred.getCertificate().getPublicKey());
+		KeyAndCertCredential result = new KeyAndCertCredential(
+				pair.getPrivate(), new X509Certificate[] { certificate,
+						caCred.getCertificate() });
+
+		return result;
+	}
+	
+	public static KeyAndCertCredential getCACredential(String caCertPath,
+			String caKeyPath, String password) throws Exception {
+		InputStream isKey = new FileInputStream(caKeyPath);
+		PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM,
+				password.toCharArray());
+
+		InputStream isCert = new FileInputStream(caCertPath);
+		X509Certificate caCert = CertificateUtils.loadCertificate(isCert,
+				Encoding.PEM);
+
+		if (isKey != null)
+			isKey.close();
+		if (isCert != null)
+			isCert.close();
+
+		return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+	}
+
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
new file mode 100644
index 0000000..85da1f2
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.StorageFactory;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.StorageFactoryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import de.fzj.unicore.wsrflite.xmlbeans.client.RegistryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.sg.Registry;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.oasisOpen.docs.wsrf.sg2.EntryType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.unigrids.services.atomic.types.PropertyType;
+import org.unigrids.x2006.x04.services.smf.CreateSMSDocument;
+import org.unigrids.x2006.x04.services.smf.StorageBackendParametersDocument.StorageBackendParameters;
+import org.unigrids.x2006.x04.services.smf.StorageDescriptionType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import javax.security.auth.x500.X500Principal;
+import java.util.Calendar;
+
+public class StorageCreator {
+	 protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	/**
+	 * the initial lifetime (in days) for newly created SMSs
+	 */
+	private int initialLifeTime;
+
+	/**
+	 * factory URL to use
+	 */
+	private String factoryUrl;
+
+	/**
+	 * site where to create the storage
+	 */
+	private String siteName;
+
+	/**
+	 * storage type to create
+	 */
+	private String storageType;
+
+	private DefaultClientConfiguration secProps;
+	
+	private String userName;
+	
+	public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String storageType, String userName) {
+		this.secProps = secProps; 
+		this.factoryUrl = getStorageFactoryUrl(besUrl);
+		this.storageType = storageType;
+		this.initialLifeTime = initialLifetime;
+		this.userName = userName;
+	}
+	
+	
+	public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String userName) {
+		this.secProps = secProps; 
+		this.factoryUrl = getStorageFactoryUrl(besUrl);
+		this.initialLifeTime = initialLifetime;
+		this.userName = userName;
+	}
+
+	
+	// The target site must have storage factory deployed with bes factory
+	public StorageClient createStorage() throws Exception{
+		
+		if(factoryUrl == null) {
+			throw new Exception("Cannot create Storage Factory Url");
+		}
+		
+		EndpointReferenceType sfEpr= WSUtilities.makeServiceEPR(factoryUrl, StorageFactory.SMF_PORT);
+		
+		String dn = findServerName(factoryUrl, sfEpr);
+		
+		WSUtilities.addServerIdentity(sfEpr, dn);
+		
+		secProps.getETDSettings().setReceiver(new X500Principal(dn));
+		secProps.getETDSettings().setIssuerCertificateChain(secProps.getCredential().getCertificateChain());
+		
+		// TODO: remove it afterwards
+		if(userName != null) {
+			secProps.getETDSettings().getRequestedUserAttributes2().put("xlogin", new String[]{userName});
+		}
+		
+		StorageFactoryClient sfc = new StorageFactoryClient(sfEpr, secProps);
+		
+		if (log.isDebugEnabled()){
+			log.debug("Using storage factory at <"+sfc.getUrl()+">");
+		}
+		
+		StorageClient sc = null;
+		try{
+			sc=sfc.createSMS(getCreateSMSDocument());
+			
+			String addr=sc.getEPR().getAddress().getStringValue();
+			log.info(addr);
+			
+		}catch(Exception ex){
+			log.error("Could not create storage",ex);
+			throw new Exception(ex);
+		}
+
+		return sc;
+	}
+	
+	protected String findServerName(String besUrl, EndpointReferenceType smsEpr)throws Exception{
+		
+		int besIndex = besUrl.indexOf("StorageFactory?res");
+		String ss = besUrl.substring(0, besIndex);
+		ss = ss + "Registry";
+		
+		EndpointReferenceType eprt = WSUtilities.makeServiceEPR(ss, "default_registry", Registry.REGISTRY_PORT);
+		
+		RegistryClient registry = new RegistryClient(eprt, secProps);
+		
+		//first, check if server name is already in the EPR...
+		String dn=WSUtilities.extractServerIDFromEPR(smsEpr);
+		if(dn!=null){
+			return dn;
+		}
+		//otherwise find a matching service in the registry
+		String url=smsEpr.getAddress().getStringValue();
+		if(url.contains("/services/"))url=url.substring(0,url.indexOf("/services"));
+		if(log.isDebugEnabled()) log.debug("Checking for services at "+url);
+		for(EntryType entry:registry.listEntries()){
+			if(entry.getMemberServiceEPR().getAddress().getStringValue().startsWith(url)){
+				dn=WSUtilities.extractServerIDFromEPR(entry.getMemberServiceEPR());
+				if(dn!=null){
+					return dn;
+				}
+			}
+		}
+		return null;
+	}
+
+	
+	public static String getStorageFactoryUrl(String besUrl){
+		int besIndex = besUrl.indexOf("BESFactory?res");
+		String ss = besUrl.substring(0, besIndex);
+		ss = ss + "StorageFactory?res=default_storage_factory";
+		return ss;
+	}
+	
+	/**
+	 * prepare request
+	 * */
+	protected CreateSMSDocument getCreateSMSDocument(String ...keyValueParams){
+		CreateSMSDocument in=CreateSMSDocument.Factory.newInstance();
+		in.addNewCreateSMS();
+		if(initialLifeTime>0){
+			in.getCreateSMS().addNewTerminationTime().setCalendarValue(getTermTime());
+		}
+		if(storageType!=null){
+			if(log.isDebugEnabled()) {
+				log.debug("Will create storage of type : "+storageType);
+			}
+			StorageDescriptionType desc=in.getCreateSMS().addNewStorageDescription();
+			desc.setStorageBackendType(storageType);
+			if(keyValueParams.length>1){
+				//other parameters from the cmdline as key=value
+				StorageBackendParameters params=desc.addNewStorageBackendParameters();
+				for(int i=1;i<keyValueParams.length;i++){
+					String arg=keyValueParams[i];
+					String[]sp=arg.split("=",2);
+					PropertyType prop=params.addNewProperty();
+					prop.setName(sp[0]);
+					prop.setValue(sp[1]);
+					if(log.isDebugEnabled()) {
+						log.debug("Have parameter : "+arg);
+					}
+				}
+			}
+		}
+		return in;
+	}
+
+	protected Calendar getTermTime(){
+		Calendar c = Calendar.getInstance();
+		c.add(Calendar.DATE, initialLifeTime);
+		return c;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
new file mode 100644
index 0000000..e550a3d
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class UASDataStagingProcessor {
+	
+	protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	
+	public static void generateDataStagingElements(JobDefinitionType value, ProcessContext context, String smsUrl) throws Exception{
+		smsUrl = "BFT:"+smsUrl;
+		
+		if (context.getProcessModel().getProcessOutputs().size() > 0) {
+			buildDataStagingFromInputContext(context, value, smsUrl);
+		}
+		
+		if (context.getProcessModel().getProcessOutputs().size() > 0) {
+			buildFromOutputContext(context, value, smsUrl);
+		}
+	}
+	
+	private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri, boolean useSMS)
+			throws Exception {
+		String fileName = new File(uri).getName();
+		
+		if (useSMS && uri.startsWith("file:")) {
+			uri = smsUrl+"#/"+fileName;
+			
+		} 
+		// no need to stage-in those files to the input
+		// directory because unicore site will fetch them for the user
+		// supported third party transfers include 
+		// gsiftp, http, rns, ftp
+		JSDLUtils.addDataStagingSourceElement(value, uri, null, fileName);
+
+	}
+	
+	//TODO: will be deprecated
+	private static void createStdOutURIs(JobDefinitionType value, ProcessContext context, String smsUrl, boolean isUnicore) throws Exception {
+
+		// no need to use smsUrl for output location, because output location is activity's working directory 
+		
+		if(isUnicore) {
+			String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+			String scriptExitCode = smsUrl+"#/output/"+scriptExitCodeFName;
+			JSDLUtils.addDataStagingTargetElement(value, null,
+					scriptExitCodeFName, null);
+		}
+		
+		if(!isUnicore) {
+		String stdout = ApplicationProcessor.getApplicationStdOut(value, context);
+		
+		String stderr = ApplicationProcessor.getApplicationStdErr(value, context);
+		
+		String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+				: stdout;
+		String stdoutURI = smsUrl+"#/output/"+stdoutFileName;
+		
+		JSDLUtils.addDataStagingTargetElement(value, null, stdoutFileName,
+				null);
+
+		String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+				: stderr;
+		String stderrURI = smsUrl+"#/output/"+stderrFileName;
+		
+		JSDLUtils.addDataStagingTargetElement(value, null, stderrFileName,
+				null);
+		}
+
+	}
+
+	// TODO: this should be deprecated, because the outputs are fetched using activity working dir from data transferrer
+	private static void createOutStringElements(JobDefinitionType value, String smsUrl, String prmValue) throws Exception {
+		if(prmValue == null || "".equals(prmValue)) return;
+		String finalSMSPath = smsUrl + "#/output/"+prmValue;
+		JSDLUtils.addDataStagingTargetElement(value, null, prmValue, null);
+	}
+
+	
+	private static void createOutURIElement(JobDefinitionType value,
+			String prmValue) throws Exception {
+		String fileName = new File(prmValue.toString()).getName();
+		JSDLUtils.addDataStagingTargetElement(value, null, fileName, prmValue);
+	}
+
+	
+	private static JobDefinitionType buildFromOutputContext(ProcessContext context,
+			JobDefinitionType value, String smsUrl) throws Exception {
+		List<OutputDataObjectType> applicationOutputs = context.getProcessModel().getProcessOutputs();
+		 if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+             for (OutputDataObjectType output : applicationOutputs){
+ 				if("".equals(output.getValue()) || output.getValue() == null) {
+					continue;
+				}
+
+            	if(output.getType().equals(DataType.URI)) {
+            		 createOutURIElement(value, output.getValue());
+            	}
+             }
+		 }
+		return value;
+	}
+
+	
+	private static void buildDataStagingFromInputContext(ProcessContext context, JobDefinitionType value, String smsUrl)
+			throws Exception {
+		  // sort the inputs first and then build the command ListR
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (InputDataObjectType input : context.getProcessModel().getProcessInputs()) {
+            sortedInputSet.add(input);
+        }
+      
+		
+		if (sortedInputSet != null && !sortedInputSet.isEmpty()){
+			for (InputDataObjectType input : sortedInputSet){
+				if("".equals(input.getValue()) || input.getValue() == null) {
+					continue;
+				}
+				if(input.getType().equals(DataType.URI)){
+					createInURISMSElement(value, smsUrl, input.getValue(), true);
+				}
+				else if(input.getType().equals(DataType.STRING) && input.isDataStaged()){
+					createInURISMSElement(value, smsUrl, input.getValue(), false);
+				}
+				else if(input.getType().equals(DataType.STRING) && !input.isDataStaged()){
+					ApplicationProcessor.addApplicationArgument(value, context, input.getValue());
+				}
+				else if (input.getType().equals(DataType.FLOAT) || input.getType().equals(DataType.INTEGER)){
+					if(! (input.getName().equals(BESConstants.NUMBER_OF_PROCESSES) || input.getName().equals(BESConstants.PROCESSES_PER_HOST))) {
+						// temp avoid environ going to app args
+						ApplicationProcessor.addApplicationArgument(value, context, String.valueOf(input.getValue()));
+					}
+				}
+			}
+		}
+	}
+	
+	public static boolean isUnicoreEndpoint(ProcessContext context) {
+		return context.getJobSubmissionProtocol().equals(JobSubmissionProtocol.UNICORE);
+	}
+
+}


[3/3] airavata git commit: Add implementation for BESJobSubmissionTask

Posted by go...@apache.org.
Add implementation for BESJobSubmissionTask


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d231956e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d231956e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d231956e

Branch: refs/heads/feature-workload-mgmt
Commit: d231956e853aca385f6187f5ded5978a21d6548f
Parents: 9f0e45b
Author: Gourav Shenoy <go...@apache.org>
Authored: Tue May 2 13:50:17 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Tue May 2 13:50:17 2017 -0400

----------------------------------------------------------------------
 modules/worker/task-jobsubmission/pom.xml       |  33 ++
 .../impl/BESJobSubmissionTask.java              |  83 +--
 .../jobsubmission/utils/bes/ActivityInfo.java   |  50 ++
 .../utils/bes/ApplicationProcessor.java         | 221 ++++++++
 .../jobsubmission/utils/bes/BESConstants.java   |  45 ++
 .../utils/bes/DataTransferrer.java              | 328 ++++++++++++
 .../jobsubmission/utils/bes/FileDownloader.java | 255 +++++++++
 .../utils/bes/FileTransferBase.java             | 223 ++++++++
 .../jobsubmission/utils/bes/FileUploader.java   | 242 +++++++++
 .../jobsubmission/utils/bes/JSDLGenerator.java  | 115 ++++
 .../task/jobsubmission/utils/bes/JSDLUtils.java | 517 ++++++++++++++++++
 .../task/jobsubmission/utils/bes/Mode.java      |  45 ++
 .../jobsubmission/utils/bes/MyProxyLogon.java   | 465 ++++++++++++++++
 .../task/jobsubmission/utils/bes/OSType.java    | 124 +++++
 .../utils/bes/ProcessorRequirement.java         |  61 +++
 .../jobsubmission/utils/bes/RangeValueType.java | 271 ++++++++++
 .../utils/bes/ResourceProcessor.java            |  97 ++++
 .../utils/bes/ResourceRequirement.java          |  34 ++
 .../jobsubmission/utils/bes/SPMDVariations.java |  52 ++
 .../jobsubmission/utils/bes/SecurityUtils.java  | 160 ++++++
 .../jobsubmission/utils/bes/StorageCreator.java | 207 ++++++++
 .../utils/bes/UASDataStagingProcessor.java      | 182 +++++++
 .../utils/bes/UNICORESecurityContext.java       | 195 +++++++
 .../task/jobsubmission/utils/bes/URIUtils.java  | 121 +++++
 .../utils/bes/X509SecurityContext.java          | 340 ++++++++++++
 modules/worker/worker-core/pom.xml              |   6 +
 .../airavata/worker/core/RequestData.java       | 149 ++++++
 .../airavata/worker/core/SecurityContext.java   |  24 +
 .../core/context/AbstractSecurityContext.java   |  57 ++
 .../airavata/worker/core/utils/SSHUtils.java    | 524 +++++++++++++++++++
 .../worker/core/utils/WorkerFactory.java        |  12 +
 .../airavata/worker/core/utils/WorkerUtils.java |  64 +++
 32 files changed, 5262 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/pom.xml b/modules/worker/task-jobsubmission/pom.xml
index 7d9506e..45d720e 100644
--- a/modules/worker/task-jobsubmission/pom.xml
+++ b/modules/worker/task-jobsubmission/pom.xml
@@ -41,6 +41,39 @@
             <artifactId>aurora-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>eu.unicore</groupId>
+            <artifactId>unicore-client-wrapper</artifactId>
+            <version>1.7.2_1</version>
+            <exclusions>
+                <!-- <exclusion>
+                    <groupId>org.apache.santuario</groupId>
+                    <artifactId>xmlsec</artifactId>
+                </exclusion> -->
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon-dom</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon-xpath</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.saxon</groupId>
+            <artifactId>Saxon-HE</artifactId>
+            <version>9.6.0-1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+            <version>3.1</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
index 2c6b984..65668c0 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
@@ -31,17 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
 import eu.unicore.util.httpclient.DefaultClientConfiguration;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
@@ -59,7 +48,21 @@ import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.SSHUtils;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.bes.*;
 import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
 import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,7 +108,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             // con't reuse if UserDN has been changed.
             secProperties = getSecurityConfig(processContext);
             // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
-        } catch (GFacException e) {
+        } catch (WorkerException e) {
             String msg = "Unicorn security context initialization error";
             log.error(msg, e);
             taskStatus.setState(TaskState.FAILED);
@@ -115,10 +118,10 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
 
         try {
             JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
-            JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
+            JobSubmissionInterface jobSubmissionInterface = JobSubmissionUtils.getPreferredJobSubmissionInterface(processContext);
             String factoryUrl = null;
             if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
-                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(
+                UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission(
                         jobSubmissionInterface.getJobSubmissionInterfaceId());
                 factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
             }
@@ -167,8 +170,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             jobDetails.setJobDescription(activityEpr.toString());
             jobDetails.setJobStatuses(Arrays.asList(new JobStatus(JobState.SUBMITTED)));
             processContext.setJobModel(jobDetails);
-            GFacUtils.saveJobModel(processContext, jobDetails);
-            GFacUtils.saveJobStatus(processContext, jobDetails);
+            JobSubmissionUtils.saveJobModel(processContext, jobDetails);
+            WorkerUtils.saveJobStatus(processContext, jobDetails);
             log.info(formatStatusMessage(activityEpr.getAddress()
                     .getStringValue(), factory.getActivityStatus(activityEpr)
                     .toString()));
@@ -205,8 +208,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
                 JobState applicationJobStatus = JobState.CANCELED;
                 jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
-                GFacUtils.saveJobStatus(processContext, jobDetails);
-                throw new GFacException(
+                WorkerUtils.saveJobStatus(processContext, jobDetails);
+                throw new WorkerException(
                         processContext.getExperimentId() + "Job Canceled");
             } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
                 try {
@@ -215,7 +218,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                 }
                 JobState applicationJobStatus = JobState.COMPLETE;
                 jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
-                GFacUtils.saveJobStatus(processContext, jobDetails);
+                WorkerUtils.saveJobStatus(processContext, jobDetails);
                 log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(),
                         activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString());
 
@@ -228,7 +231,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             if (copyOutput != null) {
                 copyOutputFilesToStorage(taskContext, copyOutput);
                 for (OutputDataObjectType outputDataObjectType : copyOutput) {
-                    GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+                    WorkerUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
                 }
             }
 //            dt.publishFinalOutputs();
@@ -244,13 +247,13 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
         return taskStatus;
     }
 
-    private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+    private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws WorkerException {
         ProcessContext pc = taskContext.getParentProcessContext();
         String remoteFilePath = null, fileName = null, localFilePath = null;
         try {
-            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc);
             ServerInfo serverInfo = pc.getComputeResourceServerInfo();
-            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
             for (OutputDataObjectType output : copyOutput) {
                 switch (output.getType()) {
                     case STDERR: case STDOUT: case STRING: case URI:
@@ -259,7 +262,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                             localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
                         }
                         fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
-                        URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                        URI destinationURI = WorkerUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
                         remoteFilePath = destinationURI.getPath();
                         log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath);
                         SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
@@ -271,18 +274,18 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             }
         } catch (IOException | JSchException | SSHApiException | URISyntaxException | CredentialStoreException e) {
             log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
-            throw new GFacException("Error while scp output files to remote storage file location", e);
+            throw new WorkerException("Error while scp output files to remote storage file location", e);
         }
     }
 
-    private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+    private void copyInputFilesToLocal(TaskContext taskContext) throws WorkerException {
         ProcessContext pc = taskContext.getParentProcessContext();
         StorageResourceDescription storageResource = pc.getStorageResource();
 
         if (storageResource != null) {
             hostName = storageResource.getHostName();
         } else {
-            throw new GFacException("Storage Resource is null");
+            throw new WorkerException("Storage Resource is null");
         }
         inputPath = pc.getStorageFileSystemRootLocation();
         inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
@@ -290,9 +293,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
         String remoteFilePath = null, fileName = null, localFilePath = null;
         URI remoteFileURI = null;
         try {
-            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc);
             ServerInfo serverInfo = pc.getStorageResourceServerInfo();
-            Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+            Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
 
             List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
             for (InputDataObjectType input : processInputs) {
@@ -308,11 +311,11 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             }
         } catch (IOException | JSchException | SSHApiException | URISyntaxException e) {
             log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
-            throw new GFacException("Error while scp input files to local file location", e);
+            throw new WorkerException("Error while scp input files to local file location", e);
         } catch (CredentialStoreException e) {
             String msg = "Authentication issue, make sure you are passing valid credential token";
             log.error(msg, e);
-            throw new GFacException(msg, e);
+            throw new WorkerException(msg, e);
         }
     }
 
@@ -324,7 +327,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
         processContext.setOutputDir(localPath);
     }
 
-    private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
+    private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws WorkerException {
         DefaultClientConfiguration clientConfig = null;
         try {
             UNICORESecurityContext unicoreSecurityContext = SecurityUtils.getSecurityContext(pc);
@@ -339,9 +342,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
                 clientConfig = unicoreSecurityContext.getDefaultConfiguration(false);
             }
         } catch (RegistryException e) {
-            throw new GFacException("Error! reading user configuration data from registry", e);
+            throw new WorkerException("Error! reading user configuration data from registry", e);
         } catch (ApplicationSettingsException e) {
-            throw new GFacException("Error! retrieving default client configurations", e);
+            throw new WorkerException("Error! retrieving default client configurations", e);
         }
 
         return clientConfig;
@@ -380,8 +383,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
         }
     }
 
-    private void sendNotification(ProcessContext processContext,  JobModel jobModel) throws GFacException {
-        GFacUtils.saveJobStatus(processContext, jobModel);
+    private void sendNotification(ProcessContext processContext,  JobModel jobModel) throws WorkerException {
+        WorkerUtils.saveJobStatus(processContext, jobModel);
     }
 
     @Override
@@ -467,9 +470,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
      * EndpointReference need to be saved to make cancel work.
      *
      * @param processContext
-     * @throws GFacException
+     * @throws WorkerException
      */
-    public boolean cancelJob(ProcessContext processContext) throws GFacException {
+    public boolean cancelJob(ProcessContext processContext) throws WorkerException {
         try {
             String activityEpr = processContext.getJobModel().getJobDescription();
             // initSecurityProperties(processContext);
@@ -479,7 +482,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
             String factoryUrl = null;
             if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
-                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+                UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission(interfaceId);
                 factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
             }
             EndpointReferenceType epr = EndpointReferenceType.Factory
@@ -490,7 +493,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
             factory.terminateActivity(eprt);
             return true;
         } catch (Exception e) {
-            throw new GFacException(e.getLocalizedMessage(), e);
+            throw new WorkerException(e.getLocalizedMessage(), e);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
new file mode 100644
index 0000000..22cf4db
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
@@ -0,0 +1,50 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.io.Serializable;
+
+public class ActivityInfo implements Serializable{
+	
+	private static final long serialVersionUID = 1L;
+
+	private EndpointReferenceType activityEPR;
+	
+	private ActivityStatusType activityStatusDoc;
+	
+
+	public EndpointReferenceType getActivityEPR() {
+		return activityEPR;
+	}
+	public void setActivityEPR(EndpointReferenceType activityEPR) {
+		this.activityEPR = activityEPR;
+	}
+	public ActivityStatusType getActivityStatus() {
+		return activityStatusDoc;
+	}
+	public void setActivityStatusDoc(ActivityStatusType activityStatusDoc) {
+		this.activityStatusDoc = activityStatusDoc;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
new file mode 100644
index 0000000..7fb442f
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
@@ -0,0 +1,221 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+public class ApplicationProcessor {
+	
+	public static void generateJobSpecificAppElements(JobDefinitionType value, ProcessContext context){
+		
+		String userName = getUserNameFromContext(context);
+//		if (userName.equalsIgnoreCase("admin")){
+//			userName = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+//		}
+		
+		ApplicationDeploymentDescription appDep= context.getApplicationDeploymentDescription();
+        String appname = context.getApplicationInterfaceDescription().getApplicationName();
+        ApplicationParallelismType parallelism = appDep.getParallelism();
+        ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+        appType.setApplicationName(appname);
+        
+
+//		if (appDep.getSetEnvironment().size() > 0) {
+//            createApplicationEnvironment(value, appDep.getSetEnvironment(), parallelism);
+//		}
+//
+        
+		String stdout = context.getStdoutLocation();
+		String stderr = context.getStderrLocation();
+		if(stdout != null) {
+			stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+		}
+		
+		if(stderr != null) {
+			stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+		}
+		
+		stdout = (stdout == null || stdout.equals("")) ? "stdout":stdout;
+		stderr = (stdout == null || stderr.equals("")) ? "stderr":stderr;
+
+        if (appDep.getExecutablePath() != null) {
+			FileNameType fNameType = FileNameType.Factory.newInstance();
+			fNameType.setStringValue(appDep.getExecutablePath());
+			if(isParallelJob(context)) {
+				JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType);
+                if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){
+                    JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value());
+                }else if (parallelism.equals(ApplicationParallelismType.MPI)){
+                    JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value());
+                }
+                
+                // setting number of processes
+                try { 
+	                String np = getInputAsString(context, BESConstants.NUMBER_OF_PROCESSES);
+	                if((np != null)  && (Integer.parseInt(np) > 0)){
+						NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance();
+	                    num.setStringValue(np);
+						JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num);
+					}
+	
+                }catch(RuntimeException np) {
+                	// do nothing
+                }
+
+
+                try { 
+	                // setting processes per host
+	                String pphost = getInputAsString(context, BESConstants.PROCESSES_PER_HOST);
+	                if((pphost != null)  && (Integer.parseInt(pphost) > 0)){
+	                    ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance();
+	                    pph.setStringValue(String.valueOf(pphost));
+	                    JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
+	                }
+                }catch(RuntimeException np) {
+                	// do nothing
+                }
+                
+                int totalThreadCount = context.getProcessModel().getProcessResourceSchedule().getNumberOfThreads();
+                // we take it as threads per processes
+                if(totalThreadCount > 0){
+					ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance();
+					tpp.setStringValue(String.valueOf(totalThreadCount));
+					JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp);
+				}
+				
+				if(userName != null) {
+					UserNameType userNameType = UserNameType.Factory.newInstance();
+					userNameType.setStringValue(userName);
+					JSDLUtils.getSPMDApplication(value).setUserName(userNameType);
+				}
+                if (stdout != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stdout);
+                    JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
+                }
+                if (stderr != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stderr);
+                    JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
+                }
+
+
+			}
+			else {
+				JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType);
+				if(userName != null) {
+					UserNameType userNameType = UserNameType.Factory.newInstance();
+					userNameType.setStringValue(userName);
+					JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType);
+				}
+                if (stdout != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stdout);
+                    JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
+                }
+                if (stderr != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stderr);
+                    JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
+                }
+			}
+		}
+	}
+	
+	public static String getUserNameFromContext(ProcessContext jobContext) {
+		if(jobContext.getProcessModel() == null)
+			return null;
+		//TODO: Extend unicore model to specify optional unix user id (allocation account) 
+		return "admin";
+	}
+
+	public static void addApplicationArgument(JobDefinitionType value, ProcessContext context, String stringPrm) {
+		if(isParallelJob(context)){ 		
+			JSDLUtils.getOrCreateSPMDApplication(value).addNewArgument().setStringValue(stringPrm);
+		}
+		else { 
+		    JSDLUtils.getOrCreatePOSIXApplication(value).addNewArgument().setStringValue(stringPrm);
+		}
+	}
+	
+	public static String getApplicationStdOut(JobDefinitionType value, ProcessContext context) throws RuntimeException {
+		if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue();
+		else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue();
+	}
+	
+	public static String getApplicationStdErr(JobDefinitionType value, ProcessContext context) throws RuntimeException {
+		if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getError().getStringValue();
+		else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue();
+	}
+	
+	public static void createGenericApplication(JobDefinitionType value, String appName) {
+        ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+        appType.setApplicationName(appName);
+    }
+	
+	public static boolean isParallelJob(ProcessContext context) {
+		
+		ApplicationDeploymentDescription appDep = context.getApplicationDeploymentDescription();
+		ApplicationParallelismType parallelism = appDep.getParallelism();
+		
+		boolean isParallel = false;
+		
+		if(parallelism.equals(ApplicationParallelismType.MPI) ||
+		   parallelism.equals(ApplicationParallelismType.OPENMP_MPI) ||
+		   parallelism.equals(ApplicationParallelismType.OPENMP	)) {
+			isParallel = true;
+		}
+		
+		return isParallel;
+	}
+	
+	private static String getInputAsString(ProcessContext context, String name) {
+		List<InputDataObjectType> inputList = context.getProcessModel().getProcessInputs();
+		String value = null;
+		for (Iterator<InputDataObjectType> iterator = inputList.iterator(); iterator.hasNext();) {
+			InputDataObjectType inputDataObjectType = iterator
+					.next();
+			if (inputDataObjectType.getName().equals(name)) {
+				value = inputDataObjectType.getValue();
+				break;
+			}
+		}
+		return value;
+	}
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
new file mode 100644
index 0000000..5f3991e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
@@ -0,0 +1,45 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public interface BESConstants {
+	
+	public static final String PROP_SMS_EPR = "unicore.sms.epr";
+	
+	public static final String PROP_BES_URL = "bes.factory.url";
+
+	public static final String PROP_ACTIVITY_INFO = "bes.activity.info";
+	
+	public static final String PROP_CLIENT_CONF = "bes.client.config";
+	
+	public static final String PROP_CA_CERT_PATH = "bes.ca.cert.path";
+	
+	public static final String PROP_CA_KEY_PATH = "bes.ca.key.path";
+	
+	public static final String PROP_CA_KEY_PASS = "bes.ca.key.pass";
+	
+	public static final String NUMBER_OF_PROCESSES = "NumberOfProcesses";
+	
+	public static final String PROCESSES_PER_HOST = "ProcessesPerHost";
+
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
new file mode 100644
index 0000000..736d982
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
@@ -0,0 +1,328 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Data movement utility class for transferring files before and after the job execution phase.   
+ * 
+ * */
+public class DataTransferrer {
+   
+	protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	protected ProcessContext processContext;
+	
+	protected StorageClient storageClient;
+	
+	protected List<OutputDataObjectType> resultantOutputsLst;
+	
+	protected String gatewayDownloadLocation, stdoutLocation, stderrLocation;
+	
+	public DataTransferrer(ProcessContext processContext, StorageClient storageClient) {
+		this.processContext = processContext;
+		this.storageClient = storageClient;
+		resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+		initStdoutsLocation();
+	}
+	
+	private void initStdoutsLocation() {
+
+		gatewayDownloadLocation = getDownloadLocation();
+		
+		String stdout = processContext.getStdoutLocation();
+		String stderr = processContext.getStderrLocation();
+
+		if(stdout != null) {
+			stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+		}
+		
+		if(stderr != null) {
+			stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+		}
+		
+		String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+				: stdout;
+		String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+				: stderr;
+		
+		stdoutLocation = gatewayDownloadLocation+File.separator+stdoutFileName;
+		
+		stderrLocation = gatewayDownloadLocation+File.separator+stderrFileName;
+
+        List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+        if (processOutputs != null && !processOutputs.isEmpty()){
+            for (OutputDataObjectType processOutput : processOutputs){
+                if (processOutput.getType().equals(DataType.STDOUT)){
+                    processOutput.setValue(stdoutLocation);
+                }
+                if (processOutput.getType().equals(DataType.STDERR)){
+                    processOutput.setValue(stderrLocation);
+                }
+
+            }
+        }
+	}
+
+    public void uploadLocalFiles() throws WorkerException {
+        List<String> inFilePrms = new ArrayList<>();
+        // FIXME - remove hard coded file path.
+        inFilePrms.addAll(extractInFileParams());
+//        inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
+        for (String uri : inFilePrms) {
+            String fileName = new File(uri).getName();
+            if (uri.startsWith("file")) {
+                try {
+                    String uriWithoutProtocol = uri.substring(uri.lastIndexOf("://") + 2, uri.length());
+                    FileUploader fileUploader = new FileUploader(uriWithoutProtocol, fileName, Mode.overwrite, false);
+                    log.info("Uploading file {}", fileName);
+                    fileUploader.perform(storageClient);
+                } catch (FileNotFoundException e3) {
+                    throw new WorkerException(
+                            "Error while staging-in, local file "+fileName+" not found", e3);
+                } catch (Exception e) {
+                    throw new WorkerException("Cannot upload files", e);
+
+                }
+
+            }
+        }
+    }
+
+    public List<String> extractInFileParams() {
+        List<String> filePrmsList = new ArrayList<String>();
+        List<InputDataObjectType> applicationInputs = processContext.getProcessModel().getProcessInputs();
+        if (applicationInputs != null && !applicationInputs.isEmpty()){
+            for (InputDataObjectType output : applicationInputs){
+                if(output.getType().equals(DataType.URI)) {
+                    filePrmsList.add(output.getValue());
+                }
+            }
+        }
+        return filePrmsList;
+    }
+
+    public void setStorageClient(StorageClient sc){
+        storageClient = sc;
+    }
+
+    public void downloadStdOuts()  throws WorkerException{
+
+        String stdoutFileName = new File(stdoutLocation).getName();
+
+        String stderrFileName = new File(stderrLocation).getName();
+
+        FileDownloader f1 = null;
+        log.info("Downloading stdout and stderr..");
+        log.info(stdoutFileName + " -> " + stdoutLocation);
+
+        f1 = new FileDownloader(stdoutFileName, stdoutLocation, Mode.overwrite);
+        try {
+            f1.perform(storageClient);
+//            String stdoutput = readFile(stdoutLocation);
+        } catch (Exception e) {
+            log.error("Error while downloading " + stdoutFileName + " to location " + stdoutLocation, e);
+        }
+
+        log.info(stderrFileName + " -> " + stderrLocation);
+        f1.setFrom(stderrFileName);
+        f1.setTo(stderrLocation);
+        try {
+            f1.perform(storageClient);
+//            String stderror = readFile(stderrLocation);
+        } catch (Exception e) {
+            log.error("Error while downloading " + stderrFileName + " to location " + stderrLocation);
+        }
+        String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+        String scriptCodeLocation = gatewayDownloadLocation + File.separator + scriptExitCodeFName;
+        if (UASDataStagingProcessor.isUnicoreEndpoint(processContext)) {
+            f1.setFrom(scriptExitCodeFName);
+            f1.setTo(scriptCodeLocation);
+            try {
+                f1.perform(storageClient);
+                OutputDataObjectType output = new OutputDataObjectType();
+                output.setName(scriptExitCodeFName);
+                output.setValue(scriptCodeLocation);
+                output.setType(DataType.URI);
+                output.setIsRequired(true);
+                processContext.getProcessModel().getProcessOutputs().add(output);
+                log.info("UNICORE_SCRIPT_EXIT_CODE -> " + scriptCodeLocation);
+                log.info("EXIT CODE: " + readFile(scriptCodeLocation));
+            } catch (Exception e) {
+                log.error("Error downloading file " + scriptExitCodeFName + " to location " + scriptCodeLocation, e);
+            }
+        }
+    }
+
+    private String readFile(String localFile) throws IOException {
+        BufferedReader instream = new BufferedReader(new FileReader(localFile));
+        StringBuffer buff = new StringBuffer();
+        String temp = null;
+        while ((temp = instream.readLine()) != null) {
+            buff.append(temp);
+            buff.append(Constants.NEWLINE);
+        }
+
+        log.info("finish read file:" + localFile);
+
+        return buff.toString();
+    }
+
+	private String getDownloadLocation() {
+		ProcessModel processModel = processContext.getProcessModel();
+		String outputDataDir = "";
+
+		if (processContext.getOutputDir() != null ) {
+
+			outputDataDir = processContext.getOutputDir();
+			
+			
+			if ("".equals(outputDataDir)) {
+				outputDataDir = getTempPath();
+			}
+
+			else {
+				
+				// in case of remote locations use the tmp location
+				if (outputDataDir.startsWith("scp:") || 
+						outputDataDir.startsWith("ftp:") ||
+						outputDataDir.startsWith("gsiftp:")) {
+						outputDataDir = getTempPath();
+				} else if ( outputDataDir.startsWith("file:")  && 
+						     outputDataDir.contains("@")){
+							outputDataDir = getTempPath();
+					
+				} else {
+					try {
+						URI u = new URI(outputDataDir);
+						outputDataDir = u.getPath();
+					} catch (URISyntaxException e) {
+						outputDataDir = getTempPath();
+					}
+				}
+			}
+		}
+		
+		File file = new File(outputDataDir);
+		if(!file.exists()){
+			file.mkdirs();	
+		}
+
+		
+		return outputDataDir;
+	}
+
+	private String getTempPath() {
+		String tmpOutputDir = File.separator + "tmp" + File.separator
+				+ processContext.getProcessId();
+		(new File(tmpOutputDir)).mkdirs();
+		return tmpOutputDir;
+	}
+
+    public List<OutputDataObjectType> downloadRemoteFiles() throws WorkerException {
+
+        if(log.isDebugEnabled()) {
+            log.debug("Download location is:" + gatewayDownloadLocation);
+        }
+
+        List<OutputDataObjectType> applicationOutputs = processContext.getProcessModel().getProcessOutputs();
+        if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+            for (OutputDataObjectType output : applicationOutputs){
+                if("".equals(output.getValue()) || output.getValue() == null) {
+                    continue;
+                }
+                if(output.getType().equals(DataType.STDOUT)) {
+                    output.setValue(stdoutLocation);
+                    resultantOutputsLst.add(output);
+                } else if(output.getType().equals(DataType.STDERR)) {
+                    output.setValue(stderrLocation);
+                    resultantOutputsLst.add(output);
+                } else if (output.getType().equals(DataType.URI)) {
+                    String value = null;
+                    if (!output.getLocation().isEmpty()) {
+                        value = output.getLocation() + File.separator + output.getValue();
+                    } else {
+                        value = output.getValue();
+                    }
+                    String outputPath = gatewayDownloadLocation + File.separator + output.getValue();
+                    File f = new File(gatewayDownloadLocation);
+                    if (!f.exists())
+                        f.mkdirs();
+
+                    FileDownloader fileDownloader = new FileDownloader(value, outputPath, Mode.overwrite);
+                    try {
+                        log.info("Downloading file {}", value);
+                        fileDownloader.perform(storageClient);
+                        output.setType(DataType.URI);
+                        output.setValue(outputPath);
+                        resultantOutputsLst.add(output);
+                    } catch (Exception e) {
+                        log.error("Error downloading " + value + " from job working directory. ");
+//                        throw new WorkerException(e.getLocalizedMessage(),e);
+                    }
+                } else {
+                    log.info("Ignore output file {}, type {}", output.getValue(), output.getType().toString());
+                }
+
+            }
+
+        }
+
+        downloadStdOuts();
+        return resultantOutputsLst;
+
+    }
+
+    public void publishFinalOutputs() throws WorkerException {
+        try {
+            if(!resultantOutputsLst.isEmpty()) {
+                log.debug("Publishing the list of outputs to the registry instance..");
+                ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+                experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, processContext.getExperimentId());
+            }
+        } catch (RegistryException e) {
+            throw new WorkerException("Cannot publish outputs to the registry.");
+        }
+
+
+    }
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
new file mode 100644
index 0000000..937b1c1
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * helper that exports remote files from a UNICORE Storage 
+ * to the local client machine.<br/>
+ * Simple wildcards ("*" and "?") and download of 
+ * directories are supported.
+ * 
+ * TODO this should be refactored so the single-file download logic 
+ * is separated from the wildcard/directory/provided outputStream logic
+ * 
+ * @author schuller
+ */
+public class FileDownloader extends FileTransferBase{
+
+	private boolean showProgress=true;
+
+	private boolean forceFileOnly=false;
+	
+	private OutputStream targetStream=null;
+	
+	public FileDownloader(String from, String to, Mode mode){
+		this(from,to,mode,true);
+	}
+	
+	public FileDownloader(String from, String to, Mode mode, boolean failOnError){
+		this.to=to;
+		this.from=from;
+		this.mode=mode;
+		this.failOnError=failOnError;
+	}
+	
+	public void perform(StorageClient sms)throws Exception{
+		boolean isWildcard=hasWildCards(from);
+		boolean isDirectory=false;
+		GridFileType gridSource=null;
+		if(isWildcard){
+			performWildCardExport(sms);
+		}
+		else {
+			//check if source is a directory
+			gridSource=sms.listProperties(from);
+			isDirectory=gridSource.getIsDirectory();
+			if(isDirectory){
+				if(forceFileOnly){
+					throw new IOException("Source is a directory");
+				}
+				performDirectoryExport(gridSource, new File(to), sms);
+			}
+			else{
+				download(gridSource,new File(to),sms);
+			}
+		}	
+	}
+	
+	protected void performDirectoryExport(GridFileType directory, File targetDirectory, StorageClient sms)throws Exception{
+		if(!targetDirectory.exists()|| !targetDirectory.canWrite()){
+			throw new IOException("Target directory <"+to+"> does not exist or is not writable!");
+		}
+		if(!targetDirectory.isDirectory()){
+			throw new IOException("Target <"+to+"> is not a directory!");
+		}
+		GridFileType[]gridFiles=sms.listDirectory(directory.getPath());
+		for(GridFileType file: gridFiles){
+			if(file.getIsDirectory()){
+				if(!recurse) {
+					System.out.println("Skipping directory "+file.getPath());
+					continue;
+				}
+				else{
+					File newTargetDirectory=new File(targetDirectory,getName(file.getPath()));
+					boolean success=newTargetDirectory.mkdirs();
+					if(!success)throw new IOException("Can create directory: "+newTargetDirectory.getAbsolutePath());
+					performDirectoryExport(file, newTargetDirectory, sms);
+					continue;
+				}
+			}
+			download(file, new File(targetDirectory,getName(file.getPath())), sms);
+		}
+	}
+	
+	protected void performWildCardExport(StorageClient sms)throws Exception{
+		String dir=getDir(from);
+		if(dir==null)dir="/";
+		GridFileType[] files=sms.find(dir, false, from, false, null, null);
+		File targetDir=targetStream==null?new File(to):null;
+		if(targetStream==null){
+			if(!targetDir.isDirectory())throw new IOException("Target is not a directory.");
+		}
+		for(GridFileType f: files){
+			download(f, targetDir, sms);
+		}
+	}	
+	
+	private String getDir(String path){
+		return new File(path).getParent();
+	}
+	
+	private String getName(String path){
+		return new File(path).getName();
+	}
+	
+	/**
+	 * download a single regular file
+	 * 
+	 * @param source - grid file descriptor
+	 * @param localFile - local file or directory to write to
+	 * @param sms
+	 * @throws Exception
+	 */
+	private void download(GridFileType source, File localFile, StorageClient sms)throws Exception{
+		if(source==null || source.getIsDirectory()){
+			throw new IllegalStateException("Source="+source); 
+		}
+		
+		OutputStream os=targetStream!=null?targetStream:null;
+		FileTransferClient ftc=null;
+		try{
+			String path=source.getPath();
+			if(targetStream==null){
+				if(localFile.isDirectory()){
+					localFile=new File(localFile,getName(source.getPath()));
+				}
+				if(mode.equals(Mode.nooverwrite) && localFile.exists()){
+					System.out.println("File exists and creation mode was set to 'nooverwrite'.");
+					return; 
+				}
+				System.out.println("Downloading remote file '"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath());
+				os=new FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append));
+			}
+			
+			chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+			Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+			ftc=sms.getExport(path,extraParameters,chosenProtocol);
+			configure(ftc, extraParameters);
+			System.out.println("DEB:File transfer URL : "+ftc.getUrl());
+//			ProgressBar p=null;
+			if(ftc instanceof IMonitorable  && showProgress){
+				long size=ftc.getSourceFileSize();
+				if(isRange()){
+					size=getRangeSize();
+				}
+//				p=new ProgressBar(localFile.getName(),size,msg);
+//				((IMonitorable) ftc).setProgressListener(p);
+			}
+			long startTime=System.currentTimeMillis();
+			if(isRange()){
+				if(!(ftc instanceof SupportsPartialRead)){
+					throw new Exception("Byte range is defined but protocol does not allow " +
+							"partial read! Please choose a different protocol!");
+				}
+				System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+				SupportsPartialRead pReader=(SupportsPartialRead)ftc;
+				pReader.readPartial(startByte, endByte-startByte+1, os);
+			}
+			else{
+				ftc.readAllData(os);
+			}
+//			if(p!=null){
+//				p.finish();
+//			}
+			if(timing){
+				long duration=System.currentTimeMillis()-startTime;
+				double rate=(double)localFile.length()/(double)duration;
+				System.out.println("Rate: " +rate+ " kB/sec.");
+			}
+			if(targetStream==null)copyProperties(source, localFile);
+		}
+		finally{
+			try{ 
+				if(targetStream==null && os!=null){
+					os.close();
+				}
+			}catch(Exception ignored){}
+			if(ftc!=null){
+				try{
+					ftc.destroy();
+				}catch(Exception e1){
+//					System.out.println("Could not destroy the filetransfer client",e1);
+				}
+			}
+		}
+	}
+
+	/**
+	 * if possible, copy the remote executable flag to the local file
+	 * @throws Exception
+	 */
+	private void copyProperties(GridFileType source, File localFile)throws Exception{
+		try{
+			localFile.setExecutable(source.getPermissions().getExecutable());
+		}
+		catch(Exception ex){
+			//TODO: logging
+//			("Can't set 'executable' flag for "+localFile.getName(), ex);
+		}
+	}
+	
+	private void configure(FileTransferClient ftc, Map<String,String>params){
+		if(ftc instanceof UFTPFileTransferClient){
+			UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+			String secret=params.get(UFTPConstants.PARAM_SECRET);
+			u.setSecret(secret);
+		}
+	}
+
+	public void setShowProgress(boolean showProgress) {
+		this.showProgress = showProgress;
+	}
+
+	public void setForceFileOnly(boolean forceFileOnly) {
+		this.forceFileOnly = forceFileOnly;
+	}
+
+	public void setTargetStream(OutputStream targetStream) {
+		this.targetStream = targetStream;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
new file mode 100644
index 0000000..c76ab74
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.util.PropertyHelper;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.*;
+import java.util.regex.Pattern;
+
+public class FileTransferBase {
+
+	protected Properties extraParameterSource;
+
+	protected boolean timing=false;
+
+	protected boolean recurse=false;
+
+	protected String from;
+
+	protected String to;
+
+	//index of first byte to download
+	protected Long startByte;
+	
+	//index of last byte to download
+	protected Long endByte;
+	
+	/**
+	 * the creation mode
+	 */
+	protected Mode mode;
+
+	/**
+	 * whether the job processing should fail if an error occurs
+	 */
+	protected boolean failOnError;
+
+	protected List<ProtocolType.Enum> preferredProtocols=new ArrayList<ProtocolType.Enum>();
+
+	public FileTransferBase(){
+		preferredProtocols.add(ProtocolType.BFT);
+	}
+
+	protected Map<String,String>makeExtraParameters(ProtocolType.Enum protocol){
+		Map<String, String> res;
+		if(extraParameterSource==null){
+			res=new HashMap<String, String>();
+		}
+		else{
+			String p=String.valueOf(protocol);
+			PropertyHelper ph=new PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()});
+			res= ph.getFilteredMap();
+		}
+		if(res.size()>0){
+			// TODO: change it to logger 
+			System.out.println("Have "+res.size()+" extra parameters for protocol "+protocol);
+		}
+		return res;
+	}
+	
+	
+	public String getTo() {
+		return to;
+	}
+
+	public String getFrom() {
+		return from;
+	}
+
+	public void setTo(String to) {
+		this.to = to;
+	}
+
+	public void setFrom(String from) {
+		this.from = from;
+	}
+
+	public Mode getMode() {
+		return mode;
+	}
+
+	public boolean isFailOnError() {
+		return failOnError;
+	}
+
+	public boolean isTiming() {
+		return timing;
+	}
+
+	public void setTiming(boolean timing) {
+		this.timing = timing;
+	}
+
+	public void setFailOnError(boolean failOnError) {
+		this.failOnError = failOnError;
+	}
+
+	public List<ProtocolType.Enum> getPreferredProtocols() {
+		return preferredProtocols;
+	}
+
+	public void setPreferredProtocols(List<ProtocolType.Enum> preferredProtocols) {
+		this.preferredProtocols = preferredProtocols;
+	}
+
+	public void setExtraParameterSource(Properties properties){
+		this.extraParameterSource=properties;
+	}
+
+	public void setRecurse(boolean recurse) {
+		this.recurse = recurse;
+	}
+	/**
+	 * check if the given path denotes a valid remote directory
+	 * @param remotePath - the path
+	 * @param sms - the storage
+	 * @return <code>true</code> if the remote directory exists and is a directory
+	 */
+	protected boolean isValidDirectory(String remotePath, StorageClient sms){
+		boolean result=false;
+		if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){
+			try{
+				GridFileType gft=sms.listProperties(remotePath);
+				result=gft.getIsDirectory();
+			}catch(Exception ex){
+				result=false;
+			}
+		}
+		else result=true;
+		
+		return result;
+	}
+	
+	public File[] resolveWildCards(File original){
+		final String name=original.getName();
+		if(!hasWildCards(original))return new File[]{original};
+		File parent=original.getParentFile();
+		if(parent==null)parent=new File(".");
+		FilenameFilter filter=new FilenameFilter(){
+			Pattern p=createPattern(name);
+			public boolean accept(File file, String name){
+				return p.matcher(name).matches();
+			}
+		};
+		return parent.listFiles(filter);
+	}
+
+	protected boolean hasWildCards(File file){
+		return hasWildCards(file.getName());
+	}
+
+	public boolean hasWildCards(String name){
+		return name.contains("*") || name.contains("?");
+	}
+
+	private Pattern createPattern(String nameWithWildcards){
+		String regex=nameWithWildcards.replace("?",".").replace("*", ".*");
+		return Pattern.compile(regex);
+	}
+	
+	protected ProtocolType.Enum chosenProtocol=null;
+	
+	public ProtocolType.Enum getChosenProtocol(){
+		return chosenProtocol;
+	}
+
+	public Long getStartByte() {
+		return startByte;
+	}
+
+	public void setStartByte(Long startByte) {
+		this.startByte = startByte;
+	}
+
+	public Long getEndByte() {
+		return endByte;
+	}
+
+	public void setEndByte(Long endByte) {
+		this.endByte = endByte;
+	}
+	
+	/**
+	 * checks if a byte range is defined
+	 * @return <code>true</code> iff both startByte and endByte are defined
+	 */
+	protected boolean isRange(){
+		return startByte!=null && endByte!=null;
+	}
+	
+	/**
+	 * get the number of bytes in the byte range, or "-1" if the range is open-ended
+	 * @return
+	 */
+	protected long getRangeSize(){
+		if(Long.MAX_VALUE==endByte)return -1;
+		return endByte-startByte;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
new file mode 100644
index 0000000..d899b37
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * upload local file(s) to a remote location
+ *
+ * @author schuller
+ */
+public class FileUploader extends FileTransferBase{
+
+	public FileUploader(String from, String to, Mode mode)throws FileNotFoundException{
+		this(from,to,mode,true);
+	}
+
+	public FileUploader(String from, String to, Mode mode, boolean failOnError)throws FileNotFoundException{
+		this.to=to;
+		this.from=from;
+		this.mode=mode;
+		this.failOnError=failOnError;
+		checkOK();
+	}
+
+	public String getFrom() {
+		return from;
+	}
+
+	public String getTo() {
+		return to;
+	}
+
+
+	public void perform(StorageClient sms)throws Exception{
+		File fileSpec=new File(from);
+		boolean hasWildCards=false;
+		boolean isDirectory=fileSpec.isDirectory();
+		File[] fileset=null;
+		
+		if(!isDirectory){
+			hasWildCards=hasWildCards(fileSpec);
+		}
+		
+		chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+		Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+
+		if(!hasWildCards && !isDirectory){
+			//single regular file
+			uploadFile(fileSpec,to,sms,chosenProtocol,extraParameters);
+			return;
+		}
+		
+		//handle wildcards or directory
+		if(hasWildCards){
+			fileset=resolveWildCards(fileSpec);
+		}
+		else{
+			fileset=fileSpec.listFiles();
+		}
+		
+		if(!isValidDirectory(to, sms)){
+			throw new IOException("The specified remote target '"+to+"' is not a directory");
+		}
+		if(to==null)to="/";
+		String target=isDirectory?to+"/"+fileSpec.getName():to;
+		sms.createDirectory(target);
+		uploadFiles(fileset,target,sms,chosenProtocol,extraParameters);
+	}
+
+	/**
+	 * upload a set of files to a remote directory (which must exist)
+	 * 
+	 * @param files
+	 * @param remoteDirectory
+	 * @param sms
+	 * @param protocol
+	 * @param extraParameters
+	 * @throws Exception
+	 */
+	private void uploadFiles(File[]files, String remoteDirectory, StorageClient sms, ProtocolType.Enum protocol, 
+			Map<String,String>extraParameters)throws Exception{
+		for(File localFile: files){
+			String target=remoteDirectory+"/"+localFile.getName();
+			if(localFile.isDirectory()){
+				if(!recurse){
+					System.out.println("Skipping directory "+localFile.getAbsolutePath());
+				}else{
+					File[] fileset=localFile.listFiles();
+					sms.createDirectory(target);
+					uploadFiles(fileset,target,sms,protocol,extraParameters);
+				}
+			}else{
+				uploadFile(localFile,target,sms,protocol,extraParameters);
+			}
+		}
+	}
+
+	/**
+	 * uploads a single regular file
+	 * 
+	 * @param localFile
+	 * @param remotePath
+	 * @param sms
+	 * @param protocol
+	 * @param extraParameters
+	 * @throws Exception
+	 */
+	private void uploadFile(File localFile, String remotePath, StorageClient sms, ProtocolType.Enum protocol, 
+			Map<String,String>extraParameters) throws Exception{
+		long startTime=System.currentTimeMillis();
+		FileInputStream is=null;
+		FileTransferClient ftc=null;
+		try{
+			if(remotePath==null){
+				remotePath="/"+localFile.getName();
+			}
+			else if(remotePath.endsWith("/")){
+				remotePath+=localFile.getName();
+			}
+			System.out.println("Uploading local file '"+localFile.getAbsolutePath()+"' -> '"+sms.getUrl()+"#"+remotePath+"'");
+			is=new FileInputStream(localFile.getAbsolutePath());
+			boolean append=Mode.append.equals(mode);
+			ftc=sms.getImport(remotePath, append, extraParameters, protocol);
+			configure(ftc, extraParameters);
+			if(append)ftc.setAppend(true);
+			String url=ftc.getUrl();
+			System.out.println("File transfer URL : "+url);
+//			ProgressBar p=null;
+			if(ftc instanceof IMonitorable){
+				long size=localFile.length();
+				if(isRange()){
+					size=getRangeSize();
+				}
+//				p=new ProgressBar(localFile.getName(),size,msg);
+//				((IMonitorable) ftc).setProgressListener(p);
+			}
+			if(isRange()){
+				System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+				long skipped=0;
+				while(skipped<startByte){
+					skipped+=is.skip(startByte);
+				}
+				ftc.writeAllData(is, endByte-startByte+1);
+				
+			}else{
+				ftc.writeAllData(is);
+			}
+			copyProperties(localFile, sms, remotePath);
+			
+//			if(ftc instanceof IMonitorable){
+//				p.finish();
+//			}
+			
+		}finally{
+			if(ftc!=null){
+				try{
+					ftc.destroy();
+				}catch(Exception e1){
+//					msg.error("Could not clean-up the filetransfer at <"+ftc.getUrl()+">",e1);
+				}
+			}
+			try{ if(is!=null)is.close(); }catch(Exception ignored){}
+		}
+		if(timing){
+			long duration=System.currentTimeMillis()-startTime;
+			double rate=(double)localFile.length()/(double)duration;
+			System.out.println("Rate: "+rate+ " kB/sec.");
+		}
+	}
+
+	/**
+	 * if possible, copy the local executable flag to the remote file
+	 * @param sourceFile - local file
+	 * @throws Exception
+	 */
+	private void copyProperties(File sourceFile, StorageClient sms, String target)throws Exception{
+		boolean x=sourceFile.canExecute();
+		try{
+			if(x){
+				sms.changePermissions(target, true, true, x);
+			}
+		}catch(Exception ex){
+//			System.out.println("Can't set exectuable flag on remote file.",ex);
+		}
+	}
+
+	private void checkOK()throws FileNotFoundException{
+		if(!failOnError){
+			return;
+		}
+		File orig=new File(from);
+		if(!orig.isAbsolute()){
+			orig=new File(System.getProperty("user.dir"),from);
+		}
+		File[] files=resolveWildCards(orig);
+		if(files==null){
+			throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+		}
+		for(File f: files){
+			if(!f.exists())throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+		}
+	}
+	
+	private void configure(FileTransferClient ftc, Map<String,String>params){
+		if(ftc instanceof UFTPFileTransferClient){
+			UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+			String secret=params.get(UFTPConstants.PARAM_SECRET);
+			u.setSecret(secret);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
new file mode 100644
index 0000000..de96104
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * 
+ * Utility class generates a JSDL instance from JobExecutionContext instance
+ * 
+ * @author shahbaz memon
+ * 
+ * */
+
+public class JSDLGenerator implements BESConstants {
+
+	protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context) throws Exception {
+
+		JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+				.newInstance();
+		JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+		
+		// build Identification
+		createJobIdentification(value, context);
+
+		ResourceProcessor.generateResourceElements(value, context);
+
+		ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+		
+		return jobDefDoc;
+	}
+
+	public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context, String smsUrl) throws Exception {
+
+		JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+				.newInstance();
+		JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+		
+		// build Identification
+		createJobIdentification(value, context);
+
+		ResourceProcessor.generateResourceElements(value, context);
+
+		ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+		UASDataStagingProcessor.generateDataStagingElements(value, context, smsUrl);
+
+		return jobDefDoc;
+	}
+
+	public synchronized static JobDefinitionDocument buildJSDLInstance(
+            ProcessContext context, String smsUrl, Object jobDirectoryMode)
+			throws Exception {
+
+		JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+				.newInstance();
+		JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+		// build Identification
+		createJobIdentification(value, context);
+
+		ResourceProcessor.generateResourceElements(value, context);
+
+		ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+		UASDataStagingProcessor.generateDataStagingElements(value, context,
+				smsUrl);
+
+		return jobDefDoc;
+	}
+
+	private static void createJobIdentification(JobDefinitionType value, ProcessContext context) {
+
+		if (context != null) {
+			if (context.getAllocationProjectNumber() != null)
+				JSDLUtils.addProjectName(value, context.getAllocationProjectNumber());
+			
+			if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationDescription() != null)
+				JSDLUtils.getOrCreateJobIdentification(value).setDescription(context.getApplicationInterfaceDescription().getApplicationDescription());
+			
+			if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationName() != null)
+				JSDLUtils.getOrCreateJobIdentification(value).setJobName(context.getApplicationInterfaceDescription().getApplicationName());
+		}
+	}
+
+
+}
\ No newline at end of file