You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by go...@apache.org on 2017/05/02 17:50:21 UTC
[1/3] airavata git commit: Add implementation for BESJobSubmissionTask
Repository: airavata
Updated Branches:
refs/heads/feature-workload-mgmt 9f0e45b25 -> d231956e8
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
new file mode 100644
index 0000000..adacb21
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UNICORESecurityContext.java
@@ -0,0 +1,195 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.security.ProxyCertOutHandler;
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.model.experiment.UserConfigurationDataModel;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+public class UNICORESecurityContext extends X509SecurityContext {
+
+ private static final long serialVersionUID = 1L;
+ protected static final Logger log = LoggerFactory.getLogger(UNICORESecurityContext.class);
+ private DefaultClientConfiguration secProperties;
+
+
+ public UNICORESecurityContext(CredentialReader credentialReader, RequestData requestData) {
+ super(credentialReader, requestData);
+ }
+
+
+
+ public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging) throws WorkerException, ApplicationSettingsException {
+ try{
+ X509Credential cred = getX509Credentials();
+ secProperties = new DefaultClientConfiguration(dcValidator, cred);
+ setExtraSettings();
+ }
+ catch (Exception e) {
+ throw new WorkerException(e.getMessage(), e);
+ }
+ if(enableMessageLogging) secProperties.setMessageLogging(true);
+
+ return secProperties;
+ }
+
+ public DefaultClientConfiguration getDefaultConfiguration(Boolean enableMessageLogging,
+ UserConfigurationDataModel userDataModel)
+ throws WorkerException, ApplicationSettingsException {
+
+ X509Credential cred = null;
+ try{
+ boolean genCert = userDataModel.isGenerateCert();
+ if(genCert) {
+ String userDN = userDataModel.getUserDN();
+ if (userDN == null || "".equals(userDN)){
+ log.warn("Cannot generate cert, falling back to GFAC configured MyProxy credentials");
+ return getDefaultConfiguration(enableMessageLogging);
+ }
+ else {
+ log.info("Generating X.509 certificate for: "+userDN);
+ try {
+ String caCertPath = ServerSettings.getSetting(BESConstants.PROP_CA_CERT_PATH, "");
+ String caKeyPath = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PATH, "");
+ String caKeyPass = ServerSettings.getSetting(BESConstants.PROP_CA_KEY_PASS, "");
+
+ if(caCertPath.equals("") || caKeyPath.equals("")) {
+ throw new Exception("CA certificate or key file path missing in the properties file. "
+ + "Please make sure " + BESConstants.PROP_CA_CERT_PATH + " or "
+ + BESConstants.PROP_CA_KEY_PATH + " are not empty.");
+ }
+
+ if("".equals(caKeyPass)) {
+ log.warn("Caution: CA key has no password. For security reasons it is highly recommended to set a CA key password");
+ }
+ cred = generateShortLivedCredential(userDN, caCertPath, caKeyPath, caKeyPass);
+ }catch (Exception e){
+ throw new WorkerException("Error occured while generating a short lived credential for user:"+userDN, e);
+ }
+
+ }
+ }else {
+ return getDefaultConfiguration(enableMessageLogging);
+ }
+
+ secProperties = new DefaultClientConfiguration(dcValidator, cred);
+ setExtraSettings();
+ }
+ catch (Exception e) {
+ throw new WorkerException(e.getMessage(), e);
+ }
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+ if(enableMessageLogging) secProperties.setMessageLogging(true);
+// secProperties.setDoSignMessage(true);
+ secProperties.getETDSettings()
+ .setIssuerCertificateChain(secProperties.getCredential().getCertificateChain());
+
+ return secProperties;
+ }
+
+
+ /**
+ * Get server signed credentials. Each time it is invoked new certificate
+ * is returned.
+ *
+ * @param userID
+ * @param userDN
+ * @param caCertPath
+ * @param caKeyPath
+ * @param caKeyPwd
+ * @return
+ * @throws WorkerException
+ */
+ public DefaultClientConfiguration getServerSignedConfiguration(String userID, String userDN, String caCertPath, String caKeyPath, String caKeyPwd) throws WorkerException {
+ try {
+ KeyAndCertCredential cred = SecurityUtils.generateShortLivedCertificate(userDN,caCertPath,caKeyPath,caKeyPwd);
+ secProperties = new DefaultClientConfiguration(dcValidator, cred);
+ setExtraSettings();
+ } catch (Exception e) {
+ throw new WorkerException(e.getMessage(), e);
+ }
+
+ return secProperties;
+ }
+
+
+
+
+
+ private void setExtraSettings(){
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+ secProperties.setDoSignMessage(true);
+
+ String[] outHandlers = secProperties.getOutHandlerClassNames();
+
+ Set<String> outHandlerLst = null;
+
+ // timeout in milliseconds
+ Properties p = secProperties.getExtraSettings();
+
+ if(p == null) {
+ p = new Properties();
+ }
+
+ p.setProperty("http.connection.timeout", "5000");
+ p.setProperty("http.socket.timeout", "5000");
+
+ if (outHandlers == null) {
+ outHandlerLst = new HashSet<String>();
+ } else {
+ outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
+ }
+
+ outHandlerLst.add(ProxyCertOutHandler.class.getName());
+
+ secProperties.setOutHandlerClassNames(outHandlerLst
+ .toArray(new String[outHandlerLst.size()]));
+
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+ }
+
+
+ private String getCNFromUserDN(String userDN) {
+ return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
new file mode 100644
index 0000000..1f83370
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/URIUtils.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+
+import java.net.URISyntaxException;
+
+public class URIUtils {
+
+ public static String encodeAll(String uri) throws URIException
+ {
+ String result = encodeAuthority(uri);
+ result = encodePath(uri);
+ result = encodeQuery(result );
+ result = encodeFragment(result );
+ return result;
+ }
+
+ public static String encodeAuthority(String uri) throws URIException
+ {
+ int start = uri.indexOf("//");
+ if(start == -1) return uri;
+ start++;
+ int end = uri.indexOf("/",start+1);
+ if(end == -1) end = uri.indexOf("?",start+1);
+ if(end == -1) end = uri.indexOf("#",start+1);
+ if(end == -1) end = uri.length();
+ String before = uri.substring(0, start+1);
+ String authority= uri.substring(start+1,end);
+ String after = uri.substring(end);
+ authority = URIUtil.encode(authority, URI.allowed_authority);
+
+ return before+authority+after;
+ }
+
+ public static String encodePath(String uri) throws URIException
+ {
+ int doubleSlashIndex = uri.indexOf("//");
+ boolean hasAuthority = doubleSlashIndex >= 0;
+ int start = -1;
+ if(hasAuthority)
+ {
+ start = uri.indexOf("/",doubleSlashIndex+2);
+ }
+ else
+ {
+ start = uri.indexOf(":");
+ }
+ if(start == -1) return uri;
+
+ int end = uri.indexOf("?",start+1);
+ if(end == -1) end = uri.indexOf("#",start+1);
+ if(end == -1) end = uri.length();
+ String before = uri.substring(0, start+1);
+ String path= uri.substring(start+1,end);
+ String after = uri.substring(end);
+ path = URIUtil.encode(path, URI.allowed_abs_path);
+ return before+path+after;
+ }
+
+
+ public static String encodeQuery(String uri) throws URIException
+ {
+ int queryStart = uri.indexOf("?");
+ if(queryStart == -1) return uri;
+ int queryEnd = uri.indexOf("#");
+ if(queryEnd == -1) queryEnd = uri.length();
+
+ String beforeQuery = uri.substring(0, queryStart+1);
+ String query = uri.substring(queryStart+1,queryEnd);
+ String afterQuery = uri.substring(queryEnd);
+ query = URIUtil.encode(query, URI.allowed_query);
+ return beforeQuery+query+afterQuery;
+ }
+
+
+ public static String encodeFragment(String uri) throws URIException
+ {
+ int fragmentStart = uri.indexOf("#");
+ if(fragmentStart == -1) return uri;
+
+ String beforeFragment = uri.substring(0, fragmentStart+1);
+ String fragment = uri.substring(fragmentStart+1);
+ fragment = URIUtil.encode(fragment, URI.allowed_fragment);
+ return beforeFragment+fragment;
+ }
+
+ public static java.net.URI createGsiftpURI(String host, String localPath) throws URISyntaxException {
+ StringBuffer buf = new StringBuffer();
+ if (!host.startsWith("gsiftp://"))
+ buf.append("gsiftp://");
+ buf.append(host);
+ if (!host.endsWith("/"))
+ buf.append("/");
+ buf.append(localPath);
+ return new java.net.URI(buf.toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
new file mode 100644
index 0000000..5be5bc1
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/X509SecurityContext.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import eu.emi.security.authn.x509.X509Credential;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.context.AbstractSecurityContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.cxf.interceptor.security.AbstractSecurityContextInInterceptor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Handles X509 Certificate based security.
+ */
+public class X509SecurityContext extends AbstractSecurityContext {
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final Logger log = LoggerFactory.getLogger(X509SecurityContext.class);
+
+ /*
+ * context name
+ */
+ public static final String X509_SECURITY_CONTEXT = "x509.security.context";
+
+ public static final int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+
+ protected static DirectoryCertChainValidator dcValidator;
+
+ private X509Credential x509Credentials= null;
+
+ static {
+ try {
+ setUpTrustedCertificatePath();
+ // set up directory based trust validator
+ dcValidator = getTrustedCerts();
+ } catch (Exception e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+
+ public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
+
+ File file = new File(trustedCertificatePath);
+
+ if (!file.exists() || !file.canRead()) {
+ File f = new File(".");
+ log.info("Current directory " + f.getAbsolutePath());
+ throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
+ } else {
+ System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+ }
+ }
+
+ private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
+
+ String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+ setUpTrustedCertificatePath(trustedCertificatePath);
+ }
+
+ /**
+ * Gets the trusted certificate path. Trusted certificate path is stored in "X509_CERT_DIR"
+ * system property.
+ * @return The trusted certificate path as a string.
+ */
+ public static String getTrustedCertificatePath() {
+ return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
+ }
+
+
+ public X509SecurityContext(CredentialReader credentialReader, RequestData requestData) {
+ super(credentialReader, requestData);
+ }
+
+
+ /**
+ * Gets X509Credentials. The process is as follows;
+ * If credentials were queried for the first time create credentials.
+ * 1. Try creating credentials using certificates stored in the credential store
+ * 2. If 1 fails use user name and password to create credentials
+ * @return x509credentials (from CANL security API)
+ * @throws ApplicationSettingsException
+ */
+ public X509Credential getX509Credentials() throws WorkerException, ApplicationSettingsException {
+
+ if(getCredentialReader() == null) {
+ return getDefaultCredentials();
+ }
+
+ if (x509Credentials == null) {
+
+ try {
+ x509Credentials = getCredentialsFromStore();
+ } catch (Exception e) {
+ log.error("An exception occurred while retrieving credentials from the credential store. " +
+ "Will continue with my proxy user name and password.", e);
+ }
+
+ // If store does not have credentials try to get from user name and password
+ if (x509Credentials == null) {
+ x509Credentials = getDefaultCredentials();
+ }
+
+ // if still null, throw an exception
+ if (x509Credentials == null) {
+ throw new WorkerException("Unable to retrieve my proxy credentials to continue operation.");
+ }
+ } else {
+ try {
+
+ final long remainingTime = x509Credentials.getCertificate().getNotAfter().getTime() - new Date().getTime();
+
+ if (remainingTime < CREDENTIAL_RENEWING_THRESH_HOLD) {
+ // return renewCredentials();
+ log.warn("Do not support credentials renewal");
+ }
+
+ log.info("Fall back to get new default credentials");
+
+ try {
+ x509Credentials.getCertificate().checkValidity();
+ }catch(Exception e){
+ x509Credentials = getDefaultCredentials();
+ }
+
+ } catch (Exception e) {
+ throw new WorkerException("Unable to retrieve remaining life time from credentials.", e);
+ }
+ }
+
+ return x509Credentials;
+ }
+
+ /**
+ * Reads the credentials from credential store.
+ * @return If token is found in the credential store, will return a valid credential. Else returns null.
+ * @throws Exception If an error occurred while retrieving credentials.
+ */
+ public X509Credential getCredentialsFromStore() throws Exception {
+
+ if (getCredentialReader() == null) {
+ return null;
+ }
+
+ Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
+ getRequestData().getTokenId());
+
+ if (credential != null) {
+ if (credential instanceof CertificateCredential) {
+
+ log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
+ " gateway id - " + getRequestData().getGatewayId());
+
+ CertificateCredential certificateCredential = (CertificateCredential) credential;
+
+ X509Certificate[] certificates = certificateCredential.getCertificates();
+
+ KeyAndCertCredential keyAndCert = new KeyAndCertCredential(certificateCredential.getPrivateKey(), certificates);
+
+ return keyAndCert;
+ //return new GlobusGSSCredentialImpl(newCredential,
+ // GSSCredential.INITIATE_AND_ACCEPT);
+ } else {
+ log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
+ "Credential type - " + credential.getClass().getName());
+ }
+ } else {
+ log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
+ + "gateway id - " + getRequestData().getGatewayId());
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the default proxy certificate.
+ * @return Default my proxy credentials.
+ * @throws ApplicationSettingsException
+ */
+ public X509Credential getDefaultCredentials() throws WorkerException, ApplicationSettingsException {
+ MyProxyLogon logon = new MyProxyLogon();
+ logon.setValidator(dcValidator);
+ logon.setHost(getRequestData().getMyProxyServerUrl());
+ logon.setPort(getRequestData().getMyProxyPort());
+ logon.setUsername(getRequestData().getMyProxyUserName());
+ logon.setPassphrase(getRequestData().getMyProxyPassword().toCharArray());
+ logon.setLifetime(getRequestData().getMyProxyLifeTime());
+
+ try {
+ logon.connect();
+ logon.logon();
+ logon.getCredentials();
+ logon.disconnect();
+ PrivateKey pk=logon.getPrivateKey();
+ return new KeyAndCertCredential(pk, new X509Certificate[]{logon.getCertificate()});
+ } catch (Exception e) {
+ throw new WorkerException("An error occurred while retrieving default security credentials.", e);
+ }
+ }
+
+ private static DirectoryCertChainValidator getTrustedCerts() throws Exception{
+ String certLocation = getTrustedCertificatePath();
+ List<String> trustedCert = new ArrayList<String>();
+ trustedCert.add(certLocation + "/*.0");
+ trustedCert.add(certLocation + "/*.pem");
+ DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1, 60000, null);
+ return dcValidator;
+ }
+
+ private String getCNFromUserDN(String userDN) {
+ return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+ }
+
+ public KeyAndCertCredential generateShortLivedCredential(String userDN, String caCertPath, String caKeyPath,
+ String caPwd) throws Exception {
+ final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+ // ago
+
+ final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+ final long endTime = startTime + 30 * 3600 * 1000;
+
+ String keyLengthProp = "1024";
+ int keyLength = Integer.parseInt(keyLengthProp);
+ String signatureAlgorithm = "SHA1withRSA";
+
+ KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
+
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+ kpg.initialize(keyLength);
+ KeyPair pair = kpg.generateKeyPair();
+
+ X500Principal subjectDN = new X500Principal(userDN);
+ Random rand = new Random();
+
+ SubjectPublicKeyInfo publicKeyInfo;
+ try {
+ publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
+ .readObject());
+ } catch (IOException e) {
+ throw new InvalidKeyException("Can not parse the public key"
+ + "being included in the short lived certificate", e);
+ }
+
+ X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+ X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+ X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+ new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
+
+ AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+ X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+ certificate.checkValidity(new Date());
+ certificate.verify(caCred.getCertificate().getPublicKey());
+ KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+ caCred.getCertificate() });
+
+ return result;
+ }
+
+ private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
+ InputStream isKey, isCert;
+ isKey = isCert = null;
+ try {
+ isKey = new FileInputStream(caKeyPath);
+ PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
+
+ isCert = new FileInputStream(caCertPath);
+ X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+ return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+ } finally {
+ if (isKey != null){
+ isKey.close();
+ }
+ if (isCert != null) {
+ isCert.close();
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/pom.xml b/modules/worker/worker-core/pom.xml
index 8f00821..32d37d1 100644
--- a/modules/worker/worker-core/pom.xml
+++ b/modules/worker/worker-core/pom.xml
@@ -63,6 +63,12 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <!-- zookeeper dependencies -->
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
new file mode 100644
index 0000000..8a961c5
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/RequestData.java
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/28/13
+ * Time: 3:28 PM
+ */
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.worker.core.utils.WorkerConstants;
+
+/**
+ * Encapsulates GFac specific data that are coming in the request.
+ */
+public class RequestData {
+
+ private static final int DEFAULT_LIFE_TIME = 3600;
+ private static final int DEFAULT_MY_PROXY_PORT = 7512;
+
+ private String tokenId;
+ private String requestUser;
+ private String gatewayId;
+
+ private String myProxyServerUrl = null;
+ private int myProxyPort = 0;
+ private String myProxyUserName = null;
+ private String myProxyPassword = null;
+ private int myProxyLifeTime = DEFAULT_LIFE_TIME;
+
+
+
+
+ public RequestData() {
+ }
+
+ public RequestData(String gatewayId) {
+ this.gatewayId = gatewayId;
+ }
+
+ public RequestData(String tokenId, String requestUser, String gatewayId) {
+ this.tokenId = tokenId;
+ this.requestUser = requestUser;
+ this.gatewayId = gatewayId;
+ }
+
+ public String getTokenId() {
+ return tokenId;
+ }
+
+ public void setTokenId(String tokenId) {
+ this.tokenId = tokenId;
+ }
+
+ public String getRequestUser() {
+ return requestUser;
+ }
+
+ public void setRequestUser(String requestUser) {
+ this.requestUser = requestUser;
+ }
+
+ public String getGatewayId() {
+ return gatewayId;
+ }
+
+ public void setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ }
+
+ public String getMyProxyServerUrl() throws ApplicationSettingsException {
+ if (myProxyServerUrl == null) {
+ myProxyServerUrl = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER);
+ }
+ return myProxyServerUrl;
+ }
+
+ public void setMyProxyServerUrl(String myProxyServerUrl) {
+ this.myProxyServerUrl = myProxyServerUrl;
+ }
+
+ public int getMyProxyPort() {
+
+ if (myProxyPort == 0) {
+ String sPort = ServerSettings.getSetting(WorkerConstants.MYPROXY_SERVER_PORT, Integer.toString(DEFAULT_MY_PROXY_PORT));
+ myProxyPort = Integer.parseInt(sPort);
+ }
+
+ return myProxyPort;
+ }
+
+ public void setMyProxyPort(int myProxyPort) {
+ this.myProxyPort = myProxyPort;
+ }
+
+ public String getMyProxyUserName() throws ApplicationSettingsException {
+ if (myProxyUserName == null) {
+ myProxyUserName = ServerSettings.getSetting(WorkerConstants.MYPROXY_USER);
+ }
+
+ return myProxyUserName;
+ }
+
+ public void setMyProxyUserName(String myProxyUserName) {
+ this.myProxyUserName = myProxyUserName;
+ }
+
+ public String getMyProxyPassword() throws ApplicationSettingsException {
+
+ if (myProxyPassword == null) {
+ myProxyPassword = ServerSettings.getSetting(WorkerConstants.MYPROXY_PASS);
+ }
+
+ return myProxyPassword;
+ }
+
+ public int getMyProxyLifeTime() {
+ String life = ServerSettings.getSetting(WorkerConstants.MYPROXY_LIFE,Integer.toString(myProxyLifeTime));
+ myProxyLifeTime = Integer.parseInt(life);
+ return myProxyLifeTime;
+ }
+
+ public void setMyProxyLifeTime(int myProxyLifeTime) {
+ this.myProxyLifeTime = myProxyLifeTime;
+ }
+
+ public void setMyProxyPassword(String myProxyPassword) {
+ this.myProxyPassword = myProxyPassword;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
new file mode 100644
index 0000000..aec1d83
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/SecurityContext.java
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core;
+
+public interface SecurityContext {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
new file mode 100644
index 0000000..c4798e2
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/context/AbstractSecurityContext.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.context;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/26/13
+ * Time: 4:33 PM
+ */
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.SecurityContext;
+
+import java.io.Serializable;
+
+/**
+ * Abstract implementation of the security context.
+ */
+public abstract class AbstractSecurityContext implements SecurityContext, Serializable {
+
+ private CredentialReader credentialReader;
+ private RequestData requestData;
+
+ public AbstractSecurityContext(CredentialReader credentialReader, RequestData requestData) {
+ this.credentialReader = credentialReader;
+ this.requestData = requestData;
+ }
+ public AbstractSecurityContext() {
+
+ }
+
+ public CredentialReader getCredentialReader() {
+ return credentialReader;
+ }
+
+ public RequestData getRequestData() {
+ return requestData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
new file mode 100644
index 0000000..705b379
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/SSHUtils.java
@@ -0,0 +1,524 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.core.utils;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class to do all ssh and scp related things.
+ */
+public class SSHUtils {
+ private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
+
+
+ /**
+ * This will copy a local file to a remote location
+ *
+ * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass
+ * a dirctory we do copy it to that directory but we simply return the directory name
+ * todo handle the directory name as input and return the proper final output file name
+ * @param localFile Local file to transfer, this can be a directory
+ * @return returns the final remote file path, so that users can use the new file location
+ */
+ public static String scpTo(String localFile, String remoteFile, Session session) throws IOException,
+ JSchException, SSHApiException {
+ FileInputStream fis = null;
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+ boolean ptimestamp = true;
+
+ // exec 'scp -t rfile' remotely
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
+ Channel channel = session.openChannel("exec");
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ ((ChannelExec) channel).setCommand(command);
+
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ channel.connect();
+
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ File _lfile = new File(localFile);
+
+ if (ptimestamp) {
+ command = "T" + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ }
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (localFile.lastIndexOf('/') > 0) {
+ command += localFile.substring(localFile.lastIndexOf('/') + 1);
+ } else {
+ command += localFile;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send a content of localFile
+ fis = new FileInputStream(localFile);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ out.close();
+ stdOutReader.onOutput(channel);
+
+
+ channel.disconnect();
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+ //since remote file is always a file we just return the file
+ return remoteFile;
+ }
+
+ /**
+ * This method will copy a remote file to a local directory
+ *
+ * @param remoteFile remote file path, this has to be a full qualified path
+ * @param localFile This is the local file to copy, this can be a directory too
+ * @return returns the final local file path of the new file came from the remote resource
+ */
+ public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException,
+ JSchException, SSHApiException {
+ FileOutputStream fos = null;
+ try {
+ String prefix = null;
+ if (new File(localFile).isDirectory()) {
+ prefix = localFile + File.separator;
+ }
+
+ // exec 'scp -f remotefile' remotely
+ String command = "scp -f " + remoteFile;
+ Channel channel = session.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
+
+ if (!channel.isClosed()){
+ channel.connect();
+ }
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ while (true) {
+ int c = checkAck(in);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ in.read(buf, 0, 5);
+
+ long filesize = 0L;
+ while (true) {
+ if (in.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ filesize = filesize * 10L + (long) (buf[0] - '0');
+ }
+
+ String file = null;
+ for (int i = 0; ; i++) {
+ in.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ file = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ //System.out.println("filesize="+filesize+", file="+file);
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+
+ // read a content of lfile
+ fos = new FileOutputStream(prefix == null ? localFile : prefix + file);
+ int foo;
+ while (true) {
+ if (buf.length < filesize) foo = buf.length;
+ else foo = (int) filesize;
+ foo = in.read(buf, 0, foo);
+ if (foo < 0) {
+ // error
+ break;
+ }
+ fos.write(buf, 0, foo);
+ filesize -= foo;
+ if (filesize == 0L) break;
+ }
+ fos.close();
+ fos = null;
+
+ if (checkAck(in) != 0) {
+ String error = "Error transfering the file content";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ try {
+ if (fos != null) fos.close();
+ } catch (Exception ee) {
+ }
+ }
+ }
+
+ /**
+ * This method will copy a remote file to a local directory
+ *
+ * @param sourceFile remote file path, this has to be a full qualified path
+ * @param sourceSession JSch session for source
+ * @param destinationFile This is the local file to copy, this can be a directory too
+ * @param destinationSession JSch Session for target
+ * @return returns the final local file path of the new file came from the remote resource
+ */
+ public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession, boolean ignoreEmptyFile) throws
+ IOException, JSchException {
+ OutputStream sout = null;
+ InputStream sin = null;
+ OutputStream dout = null;
+ InputStream din = null;
+ try {
+ String prefix = null;
+
+ // exec 'scp -f sourceFile'
+ String sourceCommand = "scp -f " + sourceFile;
+ Channel sourceChannel = sourceSession.openChannel("exec");
+ ((ChannelExec) sourceChannel).setCommand(sourceCommand);
+ StandardOutReader sourceStdOutReader = new StandardOutReader();
+ ((ChannelExec) sourceChannel).setErrStream(sourceStdOutReader.getStandardError());
+ // get I/O streams for remote scp
+ sout = sourceChannel.getOutputStream();
+ sin = sourceChannel.getInputStream();
+ sourceChannel.connect();
+
+ boolean ptimestamp = true;
+ // exec 'scp -t destinationFile'
+ String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + destinationFile;
+ Channel targetChannel = destinationSession.openChannel("exec");
+ StandardOutReader targetStdOutReader = new StandardOutReader();
+ ((ChannelExec) targetChannel).setErrStream(targetStdOutReader.getStandardError());
+ ((ChannelExec) targetChannel).setCommand(command);
+ // get I/O streams for remote scp
+ dout = targetChannel.getOutputStream();
+ din = targetChannel.getInputStream();
+ targetChannel.connect();
+
+ if (checkAck(din) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new Exception(error);
+ }
+
+
+ byte[] buf = new byte[1024];
+
+ // send '\0'
+ buf[0] = 0;
+ sout.write(buf, 0, 1);
+ sout.flush();
+
+ log.info("Initiating transfer from:" + sourceFile + " To: " + destinationFile + ", Ignore Empty file : " + ignoreEmptyFile);
+
+ while (true) {
+ int c = checkAck(sin);
+ if (c != 'C') {
+ break;
+ }
+
+ // read '0644 '
+ sin.read(buf, 0, 5);
+
+ long fileSize = 0L;
+ while (true) {
+ if (sin.read(buf, 0, 1) < 0) {
+ // error
+ break;
+ }
+ if (buf[0] == ' ') break;
+ fileSize = fileSize * 10L + (long) (buf[0] - '0');
+ }
+
+ String fileName = null;
+ for (int i = 0; ; i++) {
+ sin.read(buf, i, 1);
+ if (buf[i] == (byte) 0x0a) {
+ fileName = new String(buf, 0, i);
+ break;
+ }
+ }
+
+ //FIXME: Remove me after fixing file transfer issue
+ if(fileSize == 0L){
+ log.warn("*****Zero byte file*****. Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile);
+ }else{
+ log.info("Transferring from:" + sourceFile + " To: " + destinationFile + ", File Size : " + fileSize + ", Ignore Empty file : " + ignoreEmptyFile);
+ }
+
+ if (fileSize == 0L && !ignoreEmptyFile){
+ String error = "Input file is empty...";
+ log.error(error);
+ throw new JSchException(error);
+ }
+ String initData = "C0644 " + fileSize + " " + fileName + "\n";
+ assert dout != null;
+ dout.write(initData.getBytes());
+ dout.flush();
+
+ // send '\0' to source
+ buf[0] = 0;
+ sout.write(buf, 0, 1);
+ sout.flush();
+
+ int rLength;
+ while (true) {
+ if (buf.length < fileSize) rLength = buf.length;
+ else rLength = (int) fileSize;
+ rLength = sin.read(buf, 0, rLength); // read content of the source File
+ if (rLength < 0) {
+ // error
+ break;
+ }
+ dout.write(buf, 0, rLength); // write to destination file
+ fileSize -= rLength;
+ if (fileSize == 0L) break;
+ }
+
+ // send '\0' to target
+ buf[0] = 0;
+ dout.write(buf, 0, 1);
+ dout.flush();
+ if (checkAck(din) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new Exception(error);
+ }
+ dout.close();
+ dout = null;
+
+ if (checkAck(sin) != 0) {
+ String error = "Error transfering the file content";
+ log.error(error);
+ throw new Exception(error);
+ }
+
+ // send '\0'
+ buf[0] = 0;
+ sout.write(buf, 0, 1);
+ sout.flush();
+ }
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ throw new JSchException(e.getMessage());
+ } finally {
+ try {
+ if (dout != null) dout.close();
+ } catch (Exception ee) {
+ log.error("", ee);
+ }
+ try {
+ if (din != null) din.close();
+ } catch (Exception ee) {
+ log.error("", ee);
+ }
+ try {
+ if (sout != null) sout.close();
+ } catch (Exception ee) {
+ log.error("", ee);
+ }
+ try {
+ if (din != null) din.close();
+ } catch (Exception ee) {
+ log.error("", ee);
+ }
+ }
+ }
+
+ public static void makeDirectory(String path, Session session) throws IOException, JSchException, WorkerException {
+
+ // exec 'scp -t rfile' remotely
+ String command = "mkdir -p " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+// session.disconnect();
+ log.error("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName());
+ throw e;
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+ throw new WorkerException(stdOutReader.getStdErrorString());
+ }
+
+ channel.disconnect();
+ }
+
+ public static List<String> listDirectory(String path, Session session) throws IOException, JSchException,
+ WorkerException {
+
+ // exec 'scp -t rfile' remotely
+ String command = "ls " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ try {
+ channel.connect();
+ } catch (JSchException e) {
+
+ channel.disconnect();
+// session.disconnect();
+
+ throw new WorkerException("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ }
+ stdOutReader.onOutput(channel);
+ stdOutReader.getStdOutputString();
+ if (stdOutReader.getStdErrorString().contains("ls:")) {
+ throw new WorkerException(stdOutReader.getStdErrorString());
+ }
+ channel.disconnect();
+ return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+ }
+
+
+ static int checkAck(InputStream in) throws IOException {
+ int b = in.read();
+ if (b == 0) return b;
+ if (b == -1) return b;
+
+ if (b == 1 || b == 2) {
+ StringBuffer sb = new StringBuffer();
+ int c;
+ do {
+ c = in.read();
+ sb.append((char) c);
+ }
+ while (c != '\n');
+ //FIXME: Redundant
+ if (b == 1) { // error
+ System.out.print(sb.toString());
+ }
+ if (b == 2) { // fatal error
+ System.out.print(sb.toString());
+ }
+ log.warn(sb.toString());
+ }
+ return b;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
index 75a8062..b5ce833 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerFactory.java
@@ -36,6 +36,7 @@ import org.apache.airavata.worker.core.authentication.SSHKeyAuthentication;
import org.apache.airavata.worker.core.cluster.ServerInfo;
import org.apache.airavata.worker.core.config.ResourceConfig;
import org.apache.airavata.worker.core.config.WorkerYamlConfigruation;
+import org.apache.airavata.worker.core.context.ProcessContext;
import org.apache.airavata.worker.core.exceptions.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +90,17 @@ public class WorkerFactory {
return resources.get(resourceJobManagerType);
}
+ public static SSHKeyAuthentication getStorageSSHKeyAuthentication(ProcessContext pc)
+ throws WorkerException, CredentialStoreException {
+ try {
+ return getSshKeyAuthentication(pc.getGatewayId(),
+ pc.getStorageResourceLoginUserName(),
+ pc.getStorageResourceCredentialToken());
+ } catch (ApplicationSettingsException | IllegalAccessException | InstantiationException e) {
+ throw new WorkerException("Couldn't build ssh authentication object", e);
+ }
+ }
+
public static SSHKeyAuthentication getSshKeyAuthentication(String gatewayId,
String loginUserName,
String credentialStoreToken)
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
index a53d736..663428a 100644
--- a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/utils/WorkerUtils.java
@@ -8,10 +8,14 @@ import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
+import org.apache.airavata.model.data.replica.*;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.status.*;
+import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.airavata.worker.core.context.ProcessContext;
@@ -20,6 +24,9 @@ import org.apache.airavata.worker.core.exceptions.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -232,4 +239,61 @@ public class WorkerUtils {
return null;
}
}
+
+ public static void saveExperimentOutput(ProcessContext processContext, String outputName, String outputVal) throws WorkerException {
+ try {
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ String experimentId = processContext.getExperimentId();
+ ExperimentModel experiment = (ExperimentModel)experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ List<OutputDataObjectType> experimentOutputs = experiment.getExperimentOutputs();
+ if (experimentOutputs != null && !experimentOutputs.isEmpty()){
+ for (OutputDataObjectType expOutput : experimentOutputs){
+ if (expOutput.getName().equals(outputName)){
+ DataProductModel dataProductModel = new DataProductModel();
+ dataProductModel.setGatewayId(processContext.getGatewayId());
+ dataProductModel.setOwnerName(processContext.getProcessModel().getUserName());
+ dataProductModel.setProductName(outputName);
+ dataProductModel.setDataProductType(DataProductType.FILE);
+
+ DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
+ replicaLocationModel.setStorageResourceId(processContext.getStorageResource().getStorageResourceId());
+ replicaLocationModel.setReplicaName(outputName + " gateway data store copy");
+ replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
+ replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
+ replicaLocationModel.setFilePath(outputVal);
+ dataProductModel.addToReplicaLocations(replicaLocationModel);
+
+ ReplicaCatalog replicaCatalog = RegistryFactory.getReplicaCatalog();
+ String productUri = replicaCatalog.registerDataProduct(dataProductModel);
+ expOutput.setValue(productUri);
+ }
+ }
+ }
+ experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, experimentId);
+ } catch (RegistryException e) {
+ String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId()
+ + " : - Error while updating experiment outputs";
+ throw new WorkerException(msg, e);
+ }
+ }
+
+ public static URI getDestinationURI(TaskContext taskContext, String hostName, String inputPath, String fileName) throws URISyntaxException {
+ String experimentDataDir = taskContext.getParentProcessContext().getProcessModel().getExperimentDataDir();
+ String filePath;
+ if(experimentDataDir != null && !experimentDataDir.isEmpty()) {
+ if(!experimentDataDir.endsWith(File.separator)){
+ experimentDataDir += File.separator;
+ }
+ if (experimentDataDir.startsWith(File.separator)) {
+ filePath = experimentDataDir + fileName;
+ } else {
+ filePath = inputPath + experimentDataDir + fileName;
+ }
+ } else {
+ filePath = inputPath + taskContext.getParentProcessContext().getProcessId() + File.separator + fileName;
+ }
+ //FIXME
+ return new URI("file", taskContext.getParentProcessContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null);
+
+ }
}
[2/3] airavata git commit: Add implementation for BESJobSubmissionTask
Posted by go...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
new file mode 100644
index 0000000..48e6986
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLUtils.java
@@ -0,0 +1,517 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+import org.apache.commons.httpclient.URIException;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlObject;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.*;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.EnvironmentType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.POSIXApplicationDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.POSIXApplicationType;
+import org.ggf.schemas.jsdl.x2006.x07.jsdlHpcpa.HPCProfileApplicationDocument;
+import org.ggf.schemas.jsdl.x2006.x07.jsdlHpcpa.HPCProfileApplicationType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.SPMDApplicationDocument;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.SPMDApplicationType;
+
+import javax.xml.namespace.QName;
+
+
+/**
+ *
+ * @author shahbaz memon, bastian demuth
+ *
+ */
+public class JSDLUtils
+{
+
+ public static final int FLAG_OVERWRITE = 1;
+ public static final int FLAG_APPEND = 2;
+ public static final int FLAG_DELETE_ON_TERMINATE = 32;
+
+ public static final QName POSIX_APPLICATION=POSIXApplicationDocument.type.getDocumentElementName();
+
+ public static final QName HPC_PROFILE_APPLICATION=HPCProfileApplicationDocument.type.getDocumentElementName();
+
+ public static final QName SPMD_APPLICATION=SPMDApplicationDocument.type.getDocumentElementName();
+
+ public static final String PROCESSESPERHOST = "ProcessesPerHost";
+ public static final String NUMBEROFPROCESSES = "NumberOfProcesses";
+ public static final String THREADSPERHOST = "ThreadsPerHost";
+
+
+
+ public static EnvironmentType addEnvVariable(JobDefinitionType def,String name, String value) {
+ POSIXApplicationType posixApp = getOrCreatePOSIXApplication(def);
+ EnvironmentType newEnv = posixApp.addNewEnvironment();
+ newEnv.setName(name);
+ newEnv.setStringValue(value);
+ return newEnv;
+ }
+
+ public static void setApplicationName(JobDefinitionType value, String applicationName) {
+ getOrCreateApplication(value).setApplicationName(applicationName);
+ }
+
+ public static void setApplicationVersion(JobDefinitionType value, String applicationVersion) {
+ getOrCreateApplication(value).setApplicationVersion(applicationVersion);
+ }
+
+ public static void addProjectName(JobDefinitionType value, String projectName) {
+ getOrCreateJobIdentification(value).addNewJobProject().setStringValue(projectName);
+ }
+
+ public static void addMultipleProjectNames(JobDefinitionType value, String[] projectNames) {
+ for (String name : projectNames) {
+ getOrCreateJobIdentification(value).addNewJobProject().setStringValue(name);
+ }
+ }
+
+ public static void addCandidateHost(JobDefinitionType value, String host) {
+ getOrCreateCandidateHosts(value).addHostName(host);
+
+ }
+ public static void addDataStagingTargetElement(JobDefinitionType value, String fileSystem, String file, String uri) {
+ addDataStagingTargetElement(value,fileSystem, file, uri, 1);
+ }
+
+ public static void addDataStagingTargetElement(JobDefinitionType value, String fileSystem, String file, String uri, int flags) {
+ JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+ DataStagingType newDS = jobDescr.addNewDataStaging();
+ CreationFlagEnumeration.Enum creationFlag = CreationFlagEnumeration.DONT_OVERWRITE;
+ if((flags & FLAG_OVERWRITE) != 0) creationFlag = CreationFlagEnumeration.OVERWRITE;
+ if((flags & FLAG_APPEND) != 0) creationFlag = CreationFlagEnumeration.APPEND;
+ boolean deleteOnTerminate = (flags & FLAG_DELETE_ON_TERMINATE) != 0;
+ newDS.setCreationFlag(creationFlag);
+ newDS.setDeleteOnTermination(deleteOnTerminate);
+ SourceTargetType target = newDS.addNewTarget();
+
+ try {
+ if (uri != null) {
+ URIUtils.encodeAll(uri);
+ target.setURI(uri);
+ }
+ } catch (URIException e) {
+ }
+ newDS.setFileName(file);
+ if (fileSystem != null && !fileSystem.equals("Work")) { //$NON-NLS-1$
+ newDS.setFilesystemName(fileSystem);
+ }
+ }
+
+ public static void addDataStagingSourceElement(JobDefinitionType value, String uri, String fileSystem, String file) {
+ addDataStagingSourceElement(value, uri, fileSystem, file, 1);
+ }
+
+ public static void addDataStagingSourceElement(JobDefinitionType value, String uri, String fileSystem, String file, int flags) {
+ JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+
+ try {
+ uri = (uri == null) ? null : URIUtils.encodeAll(uri);
+ } catch (URIException e) {
+ }
+ DataStagingType newDS = jobDescr.addNewDataStaging();
+ CreationFlagEnumeration.Enum creationFlag = CreationFlagEnumeration.DONT_OVERWRITE;
+ if((flags & FLAG_OVERWRITE) != 0) creationFlag = CreationFlagEnumeration.OVERWRITE;
+ if((flags & FLAG_APPEND) != 0) creationFlag = CreationFlagEnumeration.APPEND;
+ boolean deleteOnTerminate = (flags & FLAG_DELETE_ON_TERMINATE) != 0;
+ newDS.setCreationFlag(creationFlag);
+ newDS.setDeleteOnTermination(deleteOnTerminate);
+ SourceTargetType source = newDS.addNewSource();
+ source.setURI(uri);
+ newDS.setFileName(file);
+ if (fileSystem != null && !fileSystem.equals("Work")) { //$NON-NLS-1$
+ newDS.setFilesystemName(fileSystem);
+ }
+ }
+
+
+ public static ApplicationType getOrCreateApplication(JobDefinitionType value) {
+ JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+ if (!jobDescr.isSetApplication()) {
+ jobDescr.addNewApplication();
+ }
+ return jobDescr.getApplication();
+ }
+
+ public static CandidateHostsType getOrCreateCandidateHosts(JobDefinitionType value) {
+ ResourcesType resources = getOrCreateResources(value);
+ if (!resources.isSetCandidateHosts()) {
+ resources.addNewCandidateHosts();
+ }
+ return resources.getCandidateHosts();
+ }
+
+ public static CPUArchitectureType getOrCreateCPUArchitecture(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if (!jobResources.isSetCPUArchitecture()) {
+ jobResources.addNewCPUArchitecture();
+ }
+ return jobResources.getCPUArchitecture();
+ }
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUCount(JobDefinitionType value) {
+ ResourcesType jobResources = getOrCreateResources(value);
+ if (!jobResources.isSetIndividualCPUCount()) {
+ jobResources.addNewIndividualCPUCount();
+ }
+ return jobResources.getIndividualCPUCount();
+ }
+
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUSpeed(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if (!jobResources.isSetIndividualCPUSpeed()) {
+ jobResources.addNewIndividualCPUSpeed();
+ }
+ return jobResources.getIndividualCPUSpeed();
+ }
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualCPUTime(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if ( !jobResources.isSetIndividualCPUTime() ) {
+ jobResources.addNewIndividualCPUTime();
+ }
+ return jobResources.getIndividualCPUTime();
+ }
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualDiskSpace(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if (!jobResources.isSetIndividualDiskSpace()) {
+ jobResources.addNewIndividualDiskSpace();
+ }
+ return jobResources.getIndividualDiskSpace();
+ }
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateIndividualPhysicalMemory(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if (!jobResources.isSetIndividualPhysicalMemory()) {
+ jobResources.addNewIndividualPhysicalMemory();
+ }
+ return jobResources.getIndividualPhysicalMemory();
+ }
+
+ public static JobDescriptionType getOrCreateJobDescription(JobDefinitionType value) {
+ if (value.getJobDescription() == null) {
+ return value.addNewJobDescription();
+ }
+ return value.getJobDescription();
+ }
+
+ public static JobIdentificationType getOrCreateJobIdentification(JobDefinitionType value) {
+ JobDescriptionType descr = getOrCreateJobDescription(value);
+ if (descr.getJobIdentification() == null) {
+ return descr.addNewJobIdentification();
+ }
+ return descr.getJobIdentification();
+ }
+
+ public static OperatingSystemType getOrCreateOperatingSystem(JobDefinitionType value)
+ {
+ ResourcesType jobResources = getOrCreateResources(value);
+ if(!jobResources.isSetOperatingSystem()) {
+ jobResources.addNewOperatingSystem();
+ }
+ return jobResources.getOperatingSystem();
+ }
+
+ public static ResourcesType getOrCreateResources(JobDefinitionType value) {
+ JobDescriptionType jobDescr = getOrCreateJobDescription(value);
+ if (!jobDescr.isSetResources()) {
+ jobDescr.addNewResources();
+ }
+ return jobDescr.getResources();
+ }
+
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateTotalCPUCount(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if ( !jobResources.isSetTotalCPUCount() ) {
+ jobResources.addNewTotalCPUCount();
+ }
+ return jobResources.getTotalCPUCount();
+ }
+
+
+ public static org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType getOrCreateTotalResourceCount(JobDefinitionType value) {
+
+ ResourcesType jobResources = getOrCreateResources(value);
+ if ( !jobResources.isSetTotalResourceCount())
+ {
+ jobResources.addNewTotalResourceCount();
+ }
+ return jobResources.getTotalResourceCount();
+ }
+
+ public static POSIXApplicationType getOrCreatePOSIXApplication(JobDefinitionType value) {
+
+ ApplicationType application = getOrCreateApplication(value);
+
+ if(getHPCProfileApplication(value) != null){
+ //TODO handle: not creating POSIX element if HPCProfile already exists
+ return getPOSIXApplication(value);
+ }
+
+ if (getPOSIXApplication(value) == null) {
+ XmlCursor acursor = application.newCursor();
+ acursor.toEndToken();
+ acursor.insertElement(POSIX_APPLICATION);
+ acursor.dispose();
+ }
+ return getPOSIXApplication(value);
+ }
+
+
+ public static SPMDApplicationType getOrCreateSPMDApplication(JobDefinitionType value) {
+
+ ApplicationType application = getOrCreateApplication(value);
+
+ if (getSPMDApplication(value) == null) {
+ XmlCursor acursor = application.newCursor();
+ acursor.toEndToken();
+ acursor.insertElement(SPMD_APPLICATION);
+ acursor.dispose();
+ }
+ return getSPMDApplication(value);
+ }
+
+ public static SPMDApplicationType getSPMDApplication(JobDefinitionType value) {
+ if (value != null &&
+ value.getJobDescription() != null &&
+ value.getJobDescription().isSetApplication() ) {
+ XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+ if (acursor.toFirstChild()) {
+ do {
+ if(acursor.getName().equals(SPMD_APPLICATION)) {
+ XmlObject result = acursor.getObject();
+ acursor.dispose();
+ return (SPMDApplicationType) result;
+ }
+ } while (acursor.toNextSibling());
+ acursor.dispose();
+ return null;
+ } else {
+ acursor.dispose();
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+
+
+ public static POSIXApplicationType getPOSIXApplication(JobDefinitionType value) {
+ if (value != null &&
+ value.getJobDescription() != null &&
+ value.getJobDescription().isSetApplication() ) {
+ XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+ if (acursor.toFirstChild()) {
+ do {
+ if(acursor.getName().equals(POSIX_APPLICATION)) {
+ XmlObject result = acursor.getObject();
+ acursor.dispose();
+ return (POSIXApplicationType) result;
+ }
+ } while (acursor.toNextSibling());
+ acursor.dispose();
+ return null;
+ } else {
+ acursor.dispose();
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+
+
+ public static HPCProfileApplicationType getOrCreateHPCProfileApplication(JobDefinitionType value) {
+
+ ApplicationType application = getOrCreateApplication(value);
+
+ if(getPOSIXApplication(value) != null){
+ //TODO handle: creating HPC element if POSIX already exists
+ return getHPCProfileApplication(value);
+ }
+
+ if (getHPCProfileApplication(value) == null) {
+ XmlCursor acursor = application.newCursor();
+ acursor.toEndToken();
+ acursor.insertElement(HPC_PROFILE_APPLICATION);
+ acursor.dispose();
+ }
+ return getHPCProfileApplication(value);
+ }
+
+
+ public static HPCProfileApplicationType getHPCProfileApplication(JobDefinitionType value) {
+ if (value != null &&
+ value.getJobDescription() != null &&
+ value.getJobDescription().isSetApplication() ) {
+ XmlCursor acursor = value.getJobDescription().getApplication().newCursor();
+ if (acursor.toFirstChild()) {
+ do {
+ if(acursor.getName().equals(HPC_PROFILE_APPLICATION)) {
+ XmlObject result = acursor.getObject();
+ acursor.dispose();
+ return (HPCProfileApplicationType) result;
+ }
+ } while (acursor.toNextSibling());
+ acursor.dispose();
+ return null;
+ } else {
+ acursor.dispose();
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+
+
+
+ public static RangeValueType getTotalCPUCountRequirements(JobDefinitionType value) {
+ if(value != null && value.getJobDescription() != null && value.getJobDescription().isSetResources() &&
+ value.getJobDescription().getResources().isSetTotalCPUCount()){
+ return toU6RangeValue(value.getJobDescription().getResources().getTotalCPUCount());
+ }
+ else
+ return null;
+ }
+
+ public static RangeValueType getTotalResourceCountRequirements(JobDefinitionType value) {
+ if(value != null && value.getJobDescription() != null && value.getJobDescription().isSetResources() &&
+ value.getJobDescription().getResources().isSetTotalResourceCount()){
+ return toU6RangeValue(value.getJobDescription().getResources().getTotalResourceCount());
+ }
+ else
+ return null;
+ }
+
+
+ public static RangeValueType toU6RangeValue(org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType jsdlType) {
+ RangeValueType result = new RangeValueType();
+ if(jsdlType.getExactArray().length > 0){
+ result.setExact(jsdlType.getExactArray(0).getDoubleValue());
+ }
+ if(jsdlType.isSetLowerBoundedRange()){
+ result.setLowerBound(jsdlType.getLowerBoundedRange().getDoubleValue());
+ }
+ if(jsdlType.isSetUpperBoundedRange()){
+ result.setUpperBound(jsdlType.getUpperBoundedRange().getDoubleValue());
+ }
+ return result;
+ }
+
+
+
+ public static void setCPUArchitectureRequirements(JobDefinitionType value, ProcessorRequirement cpuArchitecture) {
+ if(cpuArchitecture == null || cpuArchitecture.getValue() == null) return;
+ CPUArchitectureType cpuArch = getOrCreateCPUArchitecture(value);
+ cpuArch.setCPUArchitectureName(ProcessorArchitectureEnumeration.Enum.forString(cpuArchitecture.getValue()));
+ }
+
+ public static void setIndividualCPUCountRequirements(JobDefinitionType value, RangeValueType cpuCount) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualCPUCount = getOrCreateIndividualCPUCount(value);
+ setRangeValue(cpuCount, individualCPUCount);
+ }
+
+ public static void setIndividualCPUSpeedRequirements(JobDefinitionType value, RangeValueType cpuSpeed) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualCPUSpeed = getOrCreateIndividualCPUSpeed(value);
+ setRangeValue(cpuSpeed, individualCPUSpeed);
+ }
+
+ public static void setIndividualCPUTimeRequirements(JobDefinitionType value, RangeValueType cpuTime) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType cpuIndividualTime = getOrCreateIndividualCPUTime(value);
+ setRangeValue(cpuTime, cpuIndividualTime);
+ }
+
+ public static void setIndividualDiskSpaceRequirements(JobDefinitionType value, RangeValueType diskSpace) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualDiskSpace = getOrCreateIndividualDiskSpace(value);
+ setRangeValue(diskSpace, individualDiskSpace);
+ }
+
+ public static void setIndividualPhysicalMemoryRequirements(JobDefinitionType value, RangeValueType physicalMemory) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType individualPhysicalMemory = getOrCreateIndividualPhysicalMemory(value);
+ setRangeValue(physicalMemory, individualPhysicalMemory);
+ }
+
+
+ public static void setName(JobDefinitionType value, String name) {
+ getOrCreateJobIdentification(value).setJobName(name);
+ }
+
+
+ public static void setRangeValue(RangeValueType u6Type, org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType jsdlType) {
+ Double exact = u6Type.getExact();
+ Double epsilon = u6Type.getEpsilon();
+ Double lower = u6Type.getLowerBound();
+ Double upper = u6Type.getUpperBound();
+
+
+ if(lower.isNaN() && upper.isNaN())
+ {
+ ExactType exactType = jsdlType.getExactArray().length > 0 ? jsdlType.getExactArray(0) : jsdlType.addNewExact();
+ exactType.setDoubleValue(exact);
+ if(!epsilon.isNaN() && epsilon != 0)
+ {
+ exactType.setEpsilon(epsilon);
+ }
+ }
+ else
+ {
+ if(!lower.isNaN())
+ {
+ BoundaryType lowerBound = jsdlType.isSetLowerBoundedRange() ? jsdlType.getLowerBoundedRange() : jsdlType.addNewLowerBoundedRange();
+ lowerBound.setDoubleValue(lower);
+ lowerBound.setExclusiveBound(!u6Type.isIncludeLowerBound());
+ }
+
+ if(!upper.isNaN())
+ {
+ BoundaryType upperBound = jsdlType.isSetUpperBoundedRange() ? jsdlType.getUpperBoundedRange() : jsdlType.addNewUpperBoundedRange();
+ upperBound.setDoubleValue(upper);
+ upperBound.setExclusiveBound(!u6Type.isIncludeUpperBound());
+ }
+ }
+ }
+
+ public static void setTotalCPUCountRequirements(JobDefinitionType value, RangeValueType cpuCount) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType cpuTotalCount = getOrCreateTotalCPUCount(value);
+ setRangeValue(cpuCount, cpuTotalCount);
+ }
+
+ public static void setTotalResourceCountRequirements(JobDefinitionType value, RangeValueType resourceCount) {
+ org.ggf.schemas.jsdl.x2005.x11.jsdl.RangeValueType totalCount = getOrCreateTotalResourceCount(value);
+ setRangeValue(resourceCount, totalCount);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
new file mode 100644
index 0000000..3694eea
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/Mode.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+/**
+ * file creation modes
+ */
+public enum Mode {
+
+ /**
+ * overwrite any existing file
+ */
+ overwrite,
+
+ /**
+ * append to an existing file
+ */
+ append,
+
+ /**
+ * do NOT overwrite and fail if the file exists
+ */
+ nooverwrite
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
new file mode 100644
index 0000000..0794caf
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/MyProxyLogon.java
@@ -0,0 +1,465 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import eu.emi.security.authn.x509.CommonX509TrustManager;
+import eu.emi.security.authn.x509.X509CertChainValidator;
+import org.bouncycastle.asn1.oiw.OIWObjectIdentifiers;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.crypto.util.PublicKeyFactory;
+import org.bouncycastle.crypto.util.SubjectPublicKeyInfoFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+import org.bouncycastle.pkcs.PKCS10CertificationRequestBuilder;
+import org.bouncycastle.util.encoders.Base64;
+
+import javax.net.ssl.*;
+import javax.security.auth.login.FailedLoginException;
+import java.io.*;
+import java.net.ProtocolException;
+import java.security.*;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * The MyProxyLogon class provides an interface for retrieving credentials from
+ * a MyProxy server.
+ * <p/>
+ * First, use <code>setHost</code>, <code>setPort</code>,
+ * <code>setUsername</code>, <code>setPassphrase</code>,
+ * <code>setCredentialName</code>, <code>setLifetime</code> and
+ * <code>requestTrustRoots</code> to configure. Then call <code>connect</code>,
+ * <code>logon</code>, <code>getCredentials</code>, then
+ * <code>disconnect</code>. Use <code>getCertificates</code> and
+ * <code>getPrivateKey</code> to access the retrieved credentials, or
+ * <code>writeProxyFile</code> or <code>saveCredentialsToFile</code> to
+ * write them to a file. Use <code>writeTrustRoots</code>,
+ * <code>getTrustedCAs</code>, <code>getCRLs</code>,
+ * <code>getTrustRootData</code>, and <code>getTrustRootFilenames</code>
+ * for trust root information.
+ *
+ * (modified for use with UNICORE)
+ *
+ * @version 1.1
+ * @see <a href="http://myproxy.ncsa.uiuc.edu/">MyProxy Project Home Page</a>
+ *
+ */
+public class MyProxyLogon {
+
+ public final static String version = "1.1";
+
+ private enum State {
+ READY, CONNECTED, LOGGEDON, DONE
+ }
+
+ public final static String VERSION = "VERSION=MYPROXYv2";
+ private final static String GETCOMMAND = "COMMAND=0";
+ private final static String TRUSTROOTS = "TRUSTED_CERTS=";
+ private final static String USERNAME = "USERNAME=";
+ private final static String PASSPHRASE = "PASSPHRASE=";
+ private final static String LIFETIME = "LIFETIME=";
+ private final static String CREDNAME = "CRED_NAME=";
+ public final static String RESPONSE = "RESPONSE=";
+ private final static String ERROR = "ERROR=";
+ private final static String DN = "CN=ignore";
+
+ public final int DEFAULT_KEY_SIZE = 2048;
+ private int keySize = DEFAULT_KEY_SIZE;
+ private final static String keyAlg = "RSA";
+ private State state = State.READY;
+ private String host = "localhost";
+ private String username;
+ private String credname;
+ private char[] passphrase;
+ private int port = 7512;
+ private int lifetime = 43200;
+ private SSLSocket socket;
+ private BufferedInputStream socketIn;
+ private BufferedOutputStream socketOut;
+ private KeyPair keypair;
+ private Collection<X509Certificate> certificateChain;
+ private String[] trustrootFilenames;
+ private String[] trustrootData;
+ private KeyManagerFactory keyManagerFactory;
+ private TrustManager trustManager;
+
+ /**
+ * Constructs a MyProxyLogon object.
+ */
+ public MyProxyLogon() {
+ super();
+ host = System.getenv("MYPROXY_SERVER");
+ if (host == null) {
+ host = "myproxy.teragrid.org";
+ }
+ String portString = System.getenv("MYPROXY_SERVER_PORT");
+ if (portString != null) {
+ port = Integer.parseInt(portString);
+ }
+ username = System.getProperty("user.name");
+ }
+
+
+ /**
+ * sets the internal trust manager using the supplied validator
+ */
+ public void setValidator(X509CertChainValidator validator){
+ CommonX509TrustManager mtm = new CommonX509TrustManager(validator);
+ setTrustManager(mtm);
+ }
+
+ /**
+ * Sets the hostname of the MyProxy server. Defaults to localhost.
+ *
+ * @param host MyProxy server hostname
+ */
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ /**
+ * Sets the port of the MyProxy server. Defaults to 7512.
+ *
+ * @param port MyProxy server port
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets the key size.
+ *
+ * @param keySize
+ */
+ public void setKeySize(int keySize) {
+ this.keySize = keySize;
+ }
+
+ /**
+ * Gets the MyProxy username.
+ *
+ * @return MyProxy server port
+ */
+ public String getUsername() {
+ return username;
+ }
+
+ /**
+ * Sets the MyProxy username. Defaults to user.name.
+ *
+ * @param username MyProxy username
+ */
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ /**
+ * Sets the optional MyProxy credential name.
+ *
+ * @param credname credential name
+ */
+ public void setCredentialName(String credname) {
+ this.credname = credname;
+ }
+
+ /**
+ * Sets the MyProxy passphrase.
+ *
+ * @param passphrase MyProxy passphrase
+ */
+ public void setPassphrase(char[] passphrase) {
+ this.passphrase = passphrase;
+ }
+
+ /**
+ * Sets the requested credential lifetime. Defaults to 43200 seconds (12
+ * hours).
+ *
+ * @param seconds Credential lifetime
+ */
+ public void setLifetime(int seconds) {
+ lifetime = seconds;
+ }
+
+ /**
+ * Gets the certificates returned from the MyProxy server by
+ * getCredentials().
+ *
+ * @return Collection of java.security.cert.Certificate objects
+ */
+ public Collection<X509Certificate> getCertificates() {
+ return certificateChain;
+ }
+
+
+ // for unit testing
+ static PrivateKey testingPrivateKey;
+
+ /**
+ * Gets the private key generated by getCredentials().
+ *
+ * @return PrivateKey
+ */
+ public PrivateKey getPrivateKey() {
+ if(testingPrivateKey!=null){
+ //for unit testing
+ return testingPrivateKey;
+ }
+ return keypair.getPrivate();
+ }
+
+ /**
+ * Connects to the MyProxy server at the desired host and port. Requires
+ * host authentication via SSL. The host's certificate subject must
+ * match the requested hostname. If CA certificates are found in the
+ * standard GSI locations, they will be used to verify the server's
+ * certificate. If trust roots are requested and no CA certificates are
+ * found, the server's certificate will still be accepted.
+ */
+ public void connect() throws IOException, GeneralSecurityException {
+ SSLContext sc = SSLContext.getInstance("SSL");
+ if(trustManager==null){
+ throw new IllegalStateException("No trust manager has been set!");
+ }
+ TrustManager[] trustAllCerts = new TrustManager[]{trustManager};
+ sc.init(getKeyManagers(), trustAllCerts, new SecureRandom());
+ SSLSocketFactory sf = sc.getSocketFactory();
+ socket = (SSLSocket) sf.createSocket(host, port);
+ socket.startHandshake();
+ socketIn = new BufferedInputStream(socket.getInputStream());
+ socketOut = new BufferedOutputStream(socket.getOutputStream());
+ state = State.CONNECTED;
+ }
+
+ /**
+ * Set the key manager factory for use in client-side SSLSocket
+ * certificate-based authentication to the MyProxy server.
+ * Call this before connect().
+ *
+ * @param keyManagerFactory Key manager factory to use
+ */
+ public void setKeyManagerFactory(KeyManagerFactory keyManagerFactory) {
+ this.keyManagerFactory = keyManagerFactory;
+ }
+
+
+ public void setTrustManager(TrustManager trustManager) {
+ this.trustManager = trustManager;
+ }
+
+ /**
+ * Disconnects from the MyProxy server.
+ */
+ public void disconnect() throws IOException {
+ socket.close();
+ socket = null;
+ socketIn = null;
+ socketOut = null;
+ state = State.READY;
+ }
+
+ /**
+ * Logs on to the MyProxy server by issuing the MyProxy GET command.
+ */
+ public void logon() throws IOException, GeneralSecurityException {
+ String line;
+ char response;
+
+ if (state != State.CONNECTED) {
+ connect();
+ }
+
+ socketOut.write('0');
+ socketOut.flush();
+ socketOut.write(VERSION.getBytes());
+ socketOut.write('\n');
+ socketOut.write(GETCOMMAND.getBytes());
+ socketOut.write('\n');
+ socketOut.write(USERNAME.getBytes());
+ socketOut.write(username.getBytes());
+ socketOut.write('\n');
+ socketOut.write(PASSPHRASE.getBytes());
+ socketOut.write(new String(passphrase).getBytes());
+ socketOut.write('\n');
+ socketOut.write(LIFETIME.getBytes());
+ socketOut.write(Integer.toString(lifetime).getBytes());
+ socketOut.write('\n');
+ if (credname != null) {
+ socketOut.write(CREDNAME.getBytes());
+ socketOut.write(credname.getBytes());
+ socketOut.write('\n');
+ }
+ socketOut.flush();
+
+ line = readLine(socketIn);
+ if (line == null) {
+ throw new EOFException();
+ }
+ if (!line.equals(VERSION)) {
+ throw new ProtocolException("bad MyProxy protocol VERSION string: "
+ + line);
+ }
+ line = readLine(socketIn);
+ if (line == null) {
+ throw new EOFException();
+ }
+ if (!line.startsWith(RESPONSE)
+ || line.length() != RESPONSE.length() + 1) {
+ throw new ProtocolException(
+ "bad MyProxy protocol RESPONSE string: " + line);
+ }
+ response = line.charAt(RESPONSE.length());
+ if (response == '1') {
+ StringBuffer errString;
+
+ errString = new StringBuffer("MyProxy logon failed");
+ while ((line = readLine(socketIn)) != null) {
+ if (line.startsWith(ERROR)) {
+ errString.append('\n');
+ errString.append(line.substring(ERROR.length()));
+ }
+ }
+ throw new FailedLoginException(errString.toString());
+ } else if (response == '2') {
+ throw new ProtocolException(
+ "MyProxy authorization RESPONSE not implemented");
+ } else if (response != '0') {
+ throw new ProtocolException(
+ "unknown MyProxy protocol RESPONSE string: " + line);
+ }
+ while ((line = readLine(socketIn)) != null) {
+ if (line.startsWith(TRUSTROOTS)) {
+ String filenameList = line.substring(TRUSTROOTS.length());
+ trustrootFilenames = filenameList.split(",");
+ trustrootData = new String[trustrootFilenames.length];
+ for (int i = 0; i < trustrootFilenames.length; i++) {
+ String lineStart = "FILEDATA_" + trustrootFilenames[i]
+ + "=";
+ line = readLine(socketIn);
+ if (line == null) {
+ throw new EOFException();
+ }
+ if (!line.startsWith(lineStart)) {
+ throw new ProtocolException(
+ "bad MyProxy protocol RESPONSE: expecting "
+ + lineStart + " but received " + line);
+ }
+ trustrootData[i] = new String(Base64.decode(line
+ .substring(lineStart.length())));
+ }
+ }
+ }
+ state = State.LOGGEDON;
+ }
+
+
+ /**
+ * Retrieves credentials from the MyProxy server.
+ */
+ public void getCredentials() throws IOException, GeneralSecurityException {
+
+ KeyPairGenerator keyGenerator = KeyPairGenerator.getInstance(keyAlg);
+ keyGenerator.initialize(keySize);
+ keypair = keyGenerator.genKeyPair();
+ Security.addProvider(new BouncyCastleProvider());
+
+ org.bouncycastle.pkcs.PKCS10CertificationRequest pkcs10 = null;
+ try{
+ pkcs10 = generateCertificationRequest(DN, keypair);
+ }
+ catch(Exception ex){
+ throw new GeneralSecurityException(ex);
+ }
+ getCredentials(pkcs10.getEncoded());
+ }
+
+
+ public X509Certificate getCertificate() {
+ if (certificateChain == null) {
+ return null;
+ }
+ Iterator<X509Certificate> iter = this.certificateChain.iterator();
+ return iter.next();
+ }
+
+
+ private KeyManager[] getKeyManagers() {
+ return keyManagerFactory != null? keyManagerFactory.getKeyManagers() : null ;
+ }
+
+ private void getCredentials(byte[] derEncodedCertRequest) throws IOException, GeneralSecurityException {
+ if (state != State.LOGGEDON) {
+ logon();
+ }
+ socketOut.write(derEncodedCertRequest);
+ socketOut.flush();
+ int numCertificates = socketIn.read();
+ if (numCertificates == -1) {
+ throw new IOException("Error: connection aborted");
+ } else if (numCertificates == 0 || numCertificates < 0) {
+ throw new GeneralSecurityException("Error: bad number of certificates sent by server");
+ }
+ CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
+ certificateChain = new ArrayList<X509Certificate>();
+ for(int i = 0; i<numCertificates; i++){
+ X509Certificate c = (X509Certificate)certFactory.generateCertificate(socketIn);
+ certificateChain.add(c);
+ }
+ state = State.DONE;
+ }
+
+ private String readLine(InputStream is) throws IOException {
+ StringBuffer sb = new StringBuffer();
+ for (int c = is.read(); c > 0 && c != '\n'; c = is.read()) {
+ sb.append((char) c);
+ }
+ if (sb.length() > 0) {
+ return new String(sb);
+ }
+ return null;
+ }
+
+ private org.bouncycastle.pkcs.PKCS10CertificationRequest generateCertificationRequest(String dn, KeyPair kp)
+ throws Exception{
+ X500Name subject=new X500Name(dn);
+ PublicKey pubKey=kp.getPublic();
+ PrivateKey privKey=kp.getPrivate();
+ AsymmetricKeyParameter pubkeyParam = PublicKeyFactory.createKey(pubKey.getEncoded());
+ SubjectPublicKeyInfo publicKeyInfo=SubjectPublicKeyInfoFactory.createSubjectPublicKeyInfo(pubkeyParam);
+ PKCS10CertificationRequestBuilder builder=new PKCS10CertificationRequestBuilder(subject, publicKeyInfo);
+ AlgorithmIdentifier signatureAi = new AlgorithmIdentifier(OIWObjectIdentifiers.sha1WithRSA);
+ BcRSAContentSignerBuilder signerBuilder=new BcRSAContentSignerBuilder(
+ signatureAi, AlgorithmIdentifier.getInstance(OIWObjectIdentifiers.idSHA1));
+ AsymmetricKeyParameter pkParam = PrivateKeyFactory.createKey(privKey.getEncoded());
+ ContentSigner signer=signerBuilder.build(pkParam);
+ return builder.build(signer);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
new file mode 100644
index 0000000..54481df
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/OSType.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum OSType {
+
+ unknown("Unknown"), //$NON-NLS-1$
+ linux("LINUX"), //$NON-NLS-1$
+ mac_os("MACOS"), //$NON-NLS-1$
+ win95("WIN95"), //$NON-NLS-1$
+ win98("WIN98"), //$NON-NLS-1$
+ windows_R_Me("Windows_R_Me"), //$NON-NLS-1$
+ winNT("WINNT"), //$NON-NLS-1$
+ windows_2000("Windows_2000"), //$NON-NLS-1$
+ windows_XP("Windows_XP"), //$NON-NLS-1$
+ msdos("MSDOS"), //$NON-NLS-1$
+ solaris("Solaris"), //$NON-NLS-1$
+ sunOS("SunOS"), //$NON-NLS-1$
+ freeBSD("FreeBSD"), //$NON-NLS-1$
+ netBSD("NetBSD"), //$NON-NLS-1$
+ openBSD("OpenBSD"), //$NON-NLS-1$
+ bsdunix("BSDUNIX"), //$NON-NLS-1$
+ aix("AIX"), //$NON-NLS-1$
+ z_OS("z_OS"), //$NON-NLS-1$
+ os_2("OS_2"), //$NON-NLS-1$
+ os9("OS9"), //$NON-NLS-1$
+ netWare("NetWare"), //$NON-NLS-1$
+ tru64_unix("Tru64_UNIX"), //$NON-NLS-1$
+ irix("IRIX"), //$NON-NLS-1$
+ osf("OSF"), //$NON-NLS-1$
+
+ mvs("MVS"), //$NON-NLS-1$
+ os400("OS400"), //$NON-NLS-1$
+ javaVM("JavaVM"), //$NON-NLS-1$
+ win3x("WIN3x"), //$NON-NLS-1$
+ winCE("WINCE"), //$NON-NLS-1$
+ NCR3000("NCR3000"), //$NON-NLS-1$
+ dc_os("DC_OS"), //$NON-NLS-1$
+ reliant_unix("Reliant_UNIX"), //$NON-NLS-1$
+ sco_unixWare("SCO_UnixWare"), //$NON-NLS-1$
+ sco_openServer("SCO_OpenServer"), //$NON-NLS-1$
+ sequent("Sequent"), //$NON-NLS-1$
+ u6000("U6000"), //$NON-NLS-1$
+ aseries("ASERIES"), //$NON-NLS-1$
+ tandemNSK("TandemNSK"), //$NON-NLS-1$
+ tandemNT("TandemNT"), //$NON-NLS-1$
+ bs2000("BS2000"), //$NON-NLS-1$
+ lynx("Lynx"), //$NON-NLS-1$
+ xenix("XENIX"), //$NON-NLS-1$
+ vm("VM"), //$NON-NLS-1$
+ interactive_unix("Interactive_UNIX"), //$NON-NLS-1$
+ gnu_hurd("GNU_Hurd"), //$NON-NLS-1$
+ mach_kernel("MACH_Kernel"), //$NON-NLS-1$
+ inferno("Inferno"), //$NON-NLS-1$
+ qnx("QNX"), //$NON-NLS-1$
+ epoc("EPOC"), //$NON-NLS-1$
+ ixWorks("IxWorks"), //$NON-NLS-1$
+ vxWorks("VxWorks"), //$NON-NLS-1$
+ mint("MiNT"), //$NON-NLS-1$
+ beOS("BeOS"), //$NON-NLS-1$
+ hp_mpe("HP_MPE"), //$NON-NLS-1$
+ nextStep("NextStep"), //$NON-NLS-1$
+ palmPilot("PalmPilot"), //$NON-NLS-1$
+ rhapsody("Rhapsody"), //$NON-NLS-1$
+ dedicated("Dedicated"), //$NON-NLS-1$
+ os_390("OS_390"), //$NON-NLS-1$
+ vse("VSE"), //$NON-NLS-1$
+ tpf("TPF"), //$NON-NLS-1$
+ caldera_open_unix("Caldera_Open_UNIX"), //$NON-NLS-1$
+ attunix("ATTUNIX"), //$NON-NLS-1$
+ dgux("DGUX"), //$NON-NLS-1$
+ decnt("DECNT"), //$NON-NLS-1$
+ openVMS("OpenVMS"), //$NON-NLS-1$
+ hpux("HPUX"), //$NON-NLS-1$
+ other("other"); //$NON-NLS-1$
+
+
+ private OSType(String value) {
+ this.value = value;
+ }
+
+ private final String value;
+
+ public String getValue() {
+ return value;
+ }
+
+ public static OSType fromString(String value)
+ {
+ for(OSType type : values())
+ {
+ if(type.value.equals(value))
+ {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
new file mode 100644
index 0000000..1a26c57
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ProcessorRequirement.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum ProcessorRequirement{
+ sparc("sparc"), //$NON-NLS-1$
+ powerpc("powerpc"), //$NON-NLS-1$
+ x86("x86"), //$NON-NLS-1$
+ x86_32("x86_32"), //$NON-NLS-1$
+ x86_64("x86_64"), //$NON-NLS-1$
+ parisc("parisc"), //$NON-NLS-1$
+ mips("mips"), //$NON-NLS-1$
+ ia64("ia64"), //$NON-NLS-1$
+ arm("arm"), //$NON-NLS-1$
+ other("other"); //$NON-NLS-1$
+
+ ProcessorRequirement(String value) {
+ this.value = value;
+ }
+
+ private final String value;
+
+ public String getValue() {
+ return value;
+ }
+
+ public static ProcessorRequirement fromString(String value)
+ {
+ for (ProcessorRequirement type : values()) {
+ if (type.value.equals(value)) {
+ return type;
+ }
+ }
+ return other;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
new file mode 100644
index 0000000..a18b85a
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/RangeValueType.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+
+public class RangeValueType implements ResourceRequirement {
+
+
+ private double exact = Double.NaN;
+ private double lowerBound = Double.NEGATIVE_INFINITY;
+ private double upperBound = Double.POSITIVE_INFINITY;
+
+ private double epsilon = Double.NaN;
+ private boolean includeLowerBound = true;
+ private boolean includeUpperBound = true;
+
+ private boolean enabled = false;
+
+
+ public RangeValueType(double exact, double epsilon, double lowerBound, boolean includeLowerBound, double upperBound, boolean includeUpperBound, boolean enabled) {
+ this.exact = exact;
+ this.epsilon = epsilon;
+ this.lowerBound = lowerBound;
+ this.includeLowerBound = includeLowerBound;
+ this.upperBound = upperBound;
+ this.includeUpperBound = includeUpperBound;
+ this.enabled = enabled;
+ }
+
+
+
+ /**
+ * Create the range requirements
+ *
+ * @param exact -
+ * the exact value
+ * @param lowerBound -
+ * the lower bound
+ * @param upperBound -
+ * the upper bound
+ * @param includeUpperBound -
+ * true, if upperBound should be included in range
+ *
+ */
+ public RangeValueType(double exact, double epsilon, double lowerBound, boolean includeLowerBound, double upperBound, boolean includeUpperBound) {
+ this(exact,epsilon,lowerBound,includeLowerBound,upperBound,includeUpperBound,false);
+
+ }
+
+
+ /**
+ * Create the range requirements
+ *
+ * @param exact -
+ * the exact value
+ * @param lowerBound -
+ * the lower bound
+ * @param upperBound -
+ * the upper bound
+ */
+ public RangeValueType(double exact, double epsilon, double lowerBound, double upperBound) {
+ this(exact,epsilon,lowerBound,true,upperBound,true);
+ }
+
+
+ public RangeValueType(double exact, double lowerBound, double upperBound) {
+ this(exact,Double.NaN,lowerBound,true,upperBound,true);
+ }
+
+ /**
+ * Create the exact requirements
+ *
+ * @param exact -
+ * the exact value
+ * @param epsilon -
+ * the epsilon arround exact
+ *
+ */
+ public RangeValueType(double exact, double epsilon) {
+ this(exact,epsilon,Double.NaN,Double.NaN);
+ }
+
+
+ /**
+ * Create the exact requirements
+ *
+ * @param exact -
+ * the exact value
+ */
+ public RangeValueType(double exact) {
+ this(exact,Double.NaN);
+ }
+
+ public RangeValueType() {
+ }
+
+ /**
+ * Get exact requirements
+ *
+ * @return the exact requirements
+ */
+ public double getExact() {
+ return exact;
+ }
+
+ /**
+ * Set exact requirements
+ *
+ * @param exact -
+ * the exact requirements
+ */
+ public void setExact(double exact) {
+ this.exact = exact;
+ }
+
+ /**
+ * Get epsilon
+ *
+ * @return the epsilon
+ */
+ public double getEpsilon() {
+ return epsilon;
+ }
+
+ /**
+ * Set epsilon
+ *
+ * @param epsilon -
+ * epsilon belonging to to exact requirements
+ */
+ public void setEpsilon(double epsilon) {
+ this.epsilon = epsilon;
+ }
+
+ /**
+ * Get lower bound
+ *
+ * @return the lower bound
+ */
+ public double getLowerBound() {
+ return lowerBound;
+ }
+
+ /**
+ * Set lower bound
+ *
+ * @param lowerBound -
+ * the lower bound
+ */
+ public void setLowerBound(double lowerBound) {
+ this.lowerBound = lowerBound;
+ }
+
+ /**
+ * Get upper bound
+ *
+ * @return the upper bound
+ */
+ public double getUpperBound() {
+ return upperBound;
+ }
+
+ /**
+ * Set upper bound
+ *
+ * @param upperBound -
+ * the upper bound
+ */
+ public void setUpperBound(double upperBound) {
+ this.upperBound = upperBound;
+ }
+
+ /**
+ * Test if requirements are met
+ *
+ * @param value -
+ * the tested value
+ * @return <code>true</code> if value is in the range and not less than
+ * the exact value
+ */
+ public boolean lowerThanDouble(double value) {
+ return (value >= exact && value >= lowerBound && value <= upperBound) ? true : false;
+ }
+
+ public String toString() {
+ if (lowerBound == Double.NEGATIVE_INFINITY && upperBound == Double.POSITIVE_INFINITY) {
+ return Double.toString(exact);
+ }
+ else {
+ return "(e=" + Double.toString(exact) + ",l=" + Double.toString(lowerBound) + ",u=" //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ + Double.toString(upperBound) + ")"; //$NON-NLS-1$
+ }
+ }
+
+
+ public boolean isIncludeLowerBound() {
+ return includeLowerBound;
+ }
+
+
+ public void setIncludeLowerBound(boolean includeLowerBound) {
+ this.includeLowerBound = includeLowerBound;
+ }
+
+
+ public boolean isIncludeUpperBound() {
+ return includeUpperBound;
+ }
+
+
+ public void setIncludeUpperBound(boolean includeUpperBound) {
+ this.includeUpperBound = includeUpperBound;
+ }
+
+ public RangeValueType clone(){
+ return new RangeValueType(this.exact, this.epsilon, this.lowerBound, this.includeLowerBound, this.upperBound, this.includeUpperBound,this.enabled);
+ }
+
+
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(! (o instanceof RangeValueType)) return false;
+ RangeValueType other = (RangeValueType) o;
+ return doublesEqual(getExact(),other.getExact())
+ && doublesEqual(getEpsilon(), other.getEpsilon())
+ && doublesEqual(getLowerBound(), other.getLowerBound())
+ && doublesEqual(getUpperBound(), other.getUpperBound())
+ && isIncludeLowerBound() == other.isIncludeLowerBound()
+ && isIncludeUpperBound() == other.isIncludeUpperBound()
+ && isEnabled() == other.isEnabled();
+ }
+
+
+ private boolean doublesEqual(double a, double b)
+ {
+ Double A = new Double(a);
+ Double B = new Double(b);
+ return A.equals(B);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
new file mode 100644
index 0000000..8723d85
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceProcessor.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.unicore.jsdl.extensions.ResourceRequestDocument;
+import eu.unicore.jsdl.extensions.ResourceRequestType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ResourcesType;
+
+public class ResourceProcessor {
+
+
+ public static void generateResourceElements(JobDefinitionType value, ProcessContext context) throws Exception {
+ ProcessModel processModel = context.getProcessModel();
+ if (processModel != null) {
+ try {
+ ComputationalResourceSchedulingModel crs = processModel.getProcessResourceSchedule();
+
+ if (crs.getTotalPhysicalMemory() > 0) {
+ RangeValueType rangeType = new RangeValueType();
+ rangeType.setLowerBound(Double.NaN);
+ rangeType.setUpperBound(Double.NaN);
+ rangeType.setExact(crs.getTotalPhysicalMemory());
+ JSDLUtils.setIndividualPhysicalMemoryRequirements(value, rangeType);
+ }
+
+ if (crs.getNodeCount() > 0) {
+ RangeValueType rangeType = new RangeValueType();
+ rangeType.setLowerBound(Double.NaN);
+ rangeType.setUpperBound(Double.NaN);
+ rangeType.setExact(crs.getNodeCount());
+ JSDLUtils.setTotalResourceCountRequirements(value, rangeType);
+ }
+
+ if (crs.getWallTimeLimit() > 0) {
+ RangeValueType cpuTime = new RangeValueType();
+ cpuTime.setLowerBound(Double.NaN);
+ cpuTime.setUpperBound(Double.NaN);
+ long wallTime = crs.getWallTimeLimit() * 60;
+ cpuTime.setExact(wallTime);
+ JSDLUtils.setIndividualCPUTimeRequirements(value, cpuTime);
+ }
+ // the total cpu count is total cpus per node
+ if (crs.getTotalCPUCount() > 0) {
+ RangeValueType rangeType = new RangeValueType();
+ rangeType.setLowerBound(Double.NaN);
+ rangeType.setUpperBound(Double.NaN);
+ int nodeCount = crs.getNodeCount();
+ if (nodeCount <= 0) {
+ nodeCount = 1;
+ }
+ rangeType.setExact(crs.getTotalCPUCount() / nodeCount);
+ JSDLUtils.setIndividualCPUCountRequirements(value, rangeType);
+ }
+
+ String qName = crs.getQueueName();
+ if (!(qName == null || "".equals(qName))) {
+ // ignore "default" queue names
+ if (!(crs.getQueueName().trim().equalsIgnoreCase("default"))) {
+ ResourceRequestDocument rqDoc = ResourceRequestDocument.Factory.newInstance();
+ ResourceRequestType rq = rqDoc.addNewResourceRequest();
+ rq.setName("Queue");
+ rq.setValue(qName);
+ ResourcesType res = JSDLUtils.getOrCreateResources(value);
+ WSUtilities.insertAny(rqDoc, res);
+ }
+ }
+
+ } catch (NullPointerException npe) {
+ throw new Exception("No value set for resource requirements.", npe);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
new file mode 100644
index 0000000..d5708f3
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ResourceRequirement.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public interface ResourceRequirement extends Cloneable {
+
+ /**
+ * States whether this resource requirement is active
+ * and should be written into the job description.
+ * @return
+ */
+ public boolean isEnabled();
+
+ public void setEnabled(boolean enabled);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
new file mode 100644
index 0000000..46414eb
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SPMDVariations.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public enum SPMDVariations {
+
+ MPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPI"),
+ GridMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/GridMPI"),
+ IntelMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/IntelMPI"),
+ LAMMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/LAM-MPI"),
+ MPICH1 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH1"),
+ MPICH2 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH2"),
+ MPICHGM ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH-GM"),
+ MPICHMX ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MPICH-MX"),
+ MVAPICH ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MVAPICH"),
+ MVAPICH2 ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/MVAPICH2"),
+ OpenMPI ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/OpenMPI"),
+ POE ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/POE"),
+ PVM ("http://www.ogf.org/jsdl/2007/02/jsdl-spmd/PVM");
+
+ private final String variation;
+
+ private SPMDVariations(String variation) {
+ this.variation = variation;
+ }
+
+ public String value(){
+ return variation;
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
new file mode 100644
index 0000000..00fa472
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/SecurityUtils.java
@@ -0,0 +1,160 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.worker.core.RequestData;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.x500.X500Principal;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.Date;
+import java.util.Random;
+
+public class SecurityUtils {
+
+ private final static Logger logger = LoggerFactory.getLogger(SecurityUtils.class);
+
+
+ public static UNICORESecurityContext getSecurityContext(ProcessContext processContext) throws WorkerException {
+
+ if (processContext.getJobSubmissionProtocol().equals(JobSubmissionProtocol.UNICORE)) {
+ String credentialStoreToken = processContext.getTokenId(); // set by the framework
+ RequestData requestData;
+ requestData = new RequestData(processContext.getProcessModel().getUserDn());
+ requestData.setTokenId(credentialStoreToken);
+ CredentialReader credentialReader = null;
+ try {
+ credentialReader = WorkerUtils.getCredentialReader();
+ if (credentialReader == null) {
+ throw new WorkerException("Credential reader returns null");
+ }
+ } catch (Exception e) {
+ throw new WorkerException("Error while initializing credential reader");
+ }
+ return new UNICORESecurityContext(credentialReader, requestData);
+ } else {
+ throw new WorkerException("Only support UNICORE job submissions, invalid job submission protocol " +
+ processContext.getJobSubmissionProtocol().name());
+ }
+ }
+
+ public static final KeyAndCertCredential generateShortLivedCertificate(String userDN,
+ String caCertPath, String caKeyPath, String caPwd) throws Exception {
+ final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+ // ago
+
+ final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+ final long endTime = startTime + 30 * 3600 * 1000;
+
+ final String keyLengthProp = "1024";
+ int keyLength = Integer.parseInt(keyLengthProp);
+ final String signatureAlgorithm = "SHA1withRSA";
+
+ KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath,
+ caPwd);
+
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey()
+ .getAlgorithm());
+ kpg.initialize(keyLength);
+ KeyPair pair = kpg.generateKeyPair();
+
+ X500Principal subjectDN = new X500Principal(userDN);
+ Random rand = new Random();
+
+ SubjectPublicKeyInfo publicKeyInfo;
+ try {
+ publicKeyInfo = SubjectPublicKeyInfo
+ .getInstance(new ASN1InputStream(pair.getPublic()
+ .getEncoded()).readObject());
+ } catch (IOException e) {
+ throw new InvalidKeyException("Can not parse the public key"
+ + "being included in the short lived certificate", e);
+ }
+
+ X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred
+ .getCertificate().getSubjectX500Principal());
+
+ X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+ X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(
+ issuerX500Name, new BigInteger(20, rand), new Date(startTime),
+ new Date(endTime), subjectX500Name, publicKeyInfo);
+
+ AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder
+ .extractAlgorithmId(caCred.getCertificate());
+
+ X509Certificate certificate = certBuilder.build(caCred.getKey(),
+ sigAlgId, signatureAlgorithm, null, null);
+
+ certificate.checkValidity(new Date());
+ certificate.verify(caCred.getCertificate().getPublicKey());
+ KeyAndCertCredential result = new KeyAndCertCredential(
+ pair.getPrivate(), new X509Certificate[] { certificate,
+ caCred.getCertificate() });
+
+ return result;
+ }
+
+ public static KeyAndCertCredential getCACredential(String caCertPath,
+ String caKeyPath, String password) throws Exception {
+ InputStream isKey = new FileInputStream(caKeyPath);
+ PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM,
+ password.toCharArray());
+
+ InputStream isCert = new FileInputStream(caCertPath);
+ X509Certificate caCert = CertificateUtils.loadCertificate(isCert,
+ Encoding.PEM);
+
+ if (isKey != null)
+ isKey.close();
+ if (isCert != null)
+ isCert.close();
+
+ return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
new file mode 100644
index 0000000..85da1f2
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/StorageCreator.java
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.StorageFactory;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.StorageFactoryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import de.fzj.unicore.wsrflite.xmlbeans.client.RegistryClient;
+import de.fzj.unicore.wsrflite.xmlbeans.sg.Registry;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.oasisOpen.docs.wsrf.sg2.EntryType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.unigrids.services.atomic.types.PropertyType;
+import org.unigrids.x2006.x04.services.smf.CreateSMSDocument;
+import org.unigrids.x2006.x04.services.smf.StorageBackendParametersDocument.StorageBackendParameters;
+import org.unigrids.x2006.x04.services.smf.StorageDescriptionType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import javax.security.auth.x500.X500Principal;
+import java.util.Calendar;
+
+public class StorageCreator {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * the initial lifetime (in days) for newly created SMSs
+ */
+ private int initialLifeTime;
+
+ /**
+ * factory URL to use
+ */
+ private String factoryUrl;
+
+ /**
+ * site where to create the storage
+ */
+ private String siteName;
+
+ /**
+ * storage type to create
+ */
+ private String storageType;
+
+ private DefaultClientConfiguration secProps;
+
+ private String userName;
+
+ public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String storageType, String userName) {
+ this.secProps = secProps;
+ this.factoryUrl = getStorageFactoryUrl(besUrl);
+ this.storageType = storageType;
+ this.initialLifeTime = initialLifetime;
+ this.userName = userName;
+ }
+
+
+ public StorageCreator(DefaultClientConfiguration secProps, String besUrl, int initialLifetime, String userName) {
+ this.secProps = secProps;
+ this.factoryUrl = getStorageFactoryUrl(besUrl);
+ this.initialLifeTime = initialLifetime;
+ this.userName = userName;
+ }
+
+
+ // The target site must have storage factory deployed with bes factory
+ public StorageClient createStorage() throws Exception{
+
+ if(factoryUrl == null) {
+ throw new Exception("Cannot create Storage Factory Url");
+ }
+
+ EndpointReferenceType sfEpr= WSUtilities.makeServiceEPR(factoryUrl, StorageFactory.SMF_PORT);
+
+ String dn = findServerName(factoryUrl, sfEpr);
+
+ WSUtilities.addServerIdentity(sfEpr, dn);
+
+ secProps.getETDSettings().setReceiver(new X500Principal(dn));
+ secProps.getETDSettings().setIssuerCertificateChain(secProps.getCredential().getCertificateChain());
+
+ // TODO: remove it afterwards
+ if(userName != null) {
+ secProps.getETDSettings().getRequestedUserAttributes2().put("xlogin", new String[]{userName});
+ }
+
+ StorageFactoryClient sfc = new StorageFactoryClient(sfEpr, secProps);
+
+ if (log.isDebugEnabled()){
+ log.debug("Using storage factory at <"+sfc.getUrl()+">");
+ }
+
+ StorageClient sc = null;
+ try{
+ sc=sfc.createSMS(getCreateSMSDocument());
+
+ String addr=sc.getEPR().getAddress().getStringValue();
+ log.info(addr);
+
+ }catch(Exception ex){
+ log.error("Could not create storage",ex);
+ throw new Exception(ex);
+ }
+
+ return sc;
+ }
+
+ protected String findServerName(String besUrl, EndpointReferenceType smsEpr)throws Exception{
+
+ int besIndex = besUrl.indexOf("StorageFactory?res");
+ String ss = besUrl.substring(0, besIndex);
+ ss = ss + "Registry";
+
+ EndpointReferenceType eprt = WSUtilities.makeServiceEPR(ss, "default_registry", Registry.REGISTRY_PORT);
+
+ RegistryClient registry = new RegistryClient(eprt, secProps);
+
+ //first, check if server name is already in the EPR...
+ String dn=WSUtilities.extractServerIDFromEPR(smsEpr);
+ if(dn!=null){
+ return dn;
+ }
+ //otherwise find a matching service in the registry
+ String url=smsEpr.getAddress().getStringValue();
+ if(url.contains("/services/"))url=url.substring(0,url.indexOf("/services"));
+ if(log.isDebugEnabled()) log.debug("Checking for services at "+url);
+ for(EntryType entry:registry.listEntries()){
+ if(entry.getMemberServiceEPR().getAddress().getStringValue().startsWith(url)){
+ dn=WSUtilities.extractServerIDFromEPR(entry.getMemberServiceEPR());
+ if(dn!=null){
+ return dn;
+ }
+ }
+ }
+ return null;
+ }
+
+
+ public static String getStorageFactoryUrl(String besUrl){
+ int besIndex = besUrl.indexOf("BESFactory?res");
+ String ss = besUrl.substring(0, besIndex);
+ ss = ss + "StorageFactory?res=default_storage_factory";
+ return ss;
+ }
+
+ /**
+ * prepare request
+ * */
+ protected CreateSMSDocument getCreateSMSDocument(String ...keyValueParams){
+ CreateSMSDocument in=CreateSMSDocument.Factory.newInstance();
+ in.addNewCreateSMS();
+ if(initialLifeTime>0){
+ in.getCreateSMS().addNewTerminationTime().setCalendarValue(getTermTime());
+ }
+ if(storageType!=null){
+ if(log.isDebugEnabled()) {
+ log.debug("Will create storage of type : "+storageType);
+ }
+ StorageDescriptionType desc=in.getCreateSMS().addNewStorageDescription();
+ desc.setStorageBackendType(storageType);
+ if(keyValueParams.length>1){
+ //other parameters from the cmdline as key=value
+ StorageBackendParameters params=desc.addNewStorageBackendParameters();
+ for(int i=1;i<keyValueParams.length;i++){
+ String arg=keyValueParams[i];
+ String[]sp=arg.split("=",2);
+ PropertyType prop=params.addNewProperty();
+ prop.setName(sp[0]);
+ prop.setValue(sp[1]);
+ if(log.isDebugEnabled()) {
+ log.debug("Have parameter : "+arg);
+ }
+ }
+ }
+ }
+ return in;
+ }
+
+ protected Calendar getTermTime(){
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.DATE, initialLifeTime);
+ return c;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
new file mode 100644
index 0000000..e550a3d
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/UASDataStagingProcessor.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class UASDataStagingProcessor {
+
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+
+ public static void generateDataStagingElements(JobDefinitionType value, ProcessContext context, String smsUrl) throws Exception{
+ smsUrl = "BFT:"+smsUrl;
+
+ if (context.getProcessModel().getProcessOutputs().size() > 0) {
+ buildDataStagingFromInputContext(context, value, smsUrl);
+ }
+
+ if (context.getProcessModel().getProcessOutputs().size() > 0) {
+ buildFromOutputContext(context, value, smsUrl);
+ }
+ }
+
+ private static void createInURISMSElement(JobDefinitionType value, String smsUrl, String uri, boolean useSMS)
+ throws Exception {
+ String fileName = new File(uri).getName();
+
+ if (useSMS && uri.startsWith("file:")) {
+ uri = smsUrl+"#/"+fileName;
+
+ }
+ // no need to stage-in those files to the input
+ // directory because unicore site will fetch them for the user
+ // supported third party transfers include
+ // gsiftp, http, rns, ftp
+ JSDLUtils.addDataStagingSourceElement(value, uri, null, fileName);
+
+ }
+
+ //TODO: will be deprecated
+ private static void createStdOutURIs(JobDefinitionType value, ProcessContext context, String smsUrl, boolean isUnicore) throws Exception {
+
+ // no need to use smsUrl for output location, because output location is activity's working directory
+
+ if(isUnicore) {
+ String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+ String scriptExitCode = smsUrl+"#/output/"+scriptExitCodeFName;
+ JSDLUtils.addDataStagingTargetElement(value, null,
+ scriptExitCodeFName, null);
+ }
+
+ if(!isUnicore) {
+ String stdout = ApplicationProcessor.getApplicationStdOut(value, context);
+
+ String stderr = ApplicationProcessor.getApplicationStdErr(value, context);
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stdoutURI = smsUrl+"#/output/"+stdoutFileName;
+
+ JSDLUtils.addDataStagingTargetElement(value, null, stdoutFileName,
+ null);
+
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+ String stderrURI = smsUrl+"#/output/"+stderrFileName;
+
+ JSDLUtils.addDataStagingTargetElement(value, null, stderrFileName,
+ null);
+ }
+
+ }
+
+ // TODO: this should be deprecated, because the outputs are fetched using activity working dir from data transferrer
+ private static void createOutStringElements(JobDefinitionType value, String smsUrl, String prmValue) throws Exception {
+ if(prmValue == null || "".equals(prmValue)) return;
+ String finalSMSPath = smsUrl + "#/output/"+prmValue;
+ JSDLUtils.addDataStagingTargetElement(value, null, prmValue, null);
+ }
+
+
+ private static void createOutURIElement(JobDefinitionType value,
+ String prmValue) throws Exception {
+ String fileName = new File(prmValue.toString()).getName();
+ JSDLUtils.addDataStagingTargetElement(value, null, fileName, prmValue);
+ }
+
+
+ private static JobDefinitionType buildFromOutputContext(ProcessContext context,
+ JobDefinitionType value, String smsUrl) throws Exception {
+ List<OutputDataObjectType> applicationOutputs = context.getProcessModel().getProcessOutputs();
+ if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+ for (OutputDataObjectType output : applicationOutputs){
+ if("".equals(output.getValue()) || output.getValue() == null) {
+ continue;
+ }
+
+ if(output.getType().equals(DataType.URI)) {
+ createOutURIElement(value, output.getValue());
+ }
+ }
+ }
+ return value;
+ }
+
+
+ private static void buildDataStagingFromInputContext(ProcessContext context, JobDefinitionType value, String smsUrl)
+ throws Exception {
+ // sort the inputs first and then build the command ListR
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (InputDataObjectType input : context.getProcessModel().getProcessInputs()) {
+ sortedInputSet.add(input);
+ }
+
+
+ if (sortedInputSet != null && !sortedInputSet.isEmpty()){
+ for (InputDataObjectType input : sortedInputSet){
+ if("".equals(input.getValue()) || input.getValue() == null) {
+ continue;
+ }
+ if(input.getType().equals(DataType.URI)){
+ createInURISMSElement(value, smsUrl, input.getValue(), true);
+ }
+ else if(input.getType().equals(DataType.STRING) && input.isDataStaged()){
+ createInURISMSElement(value, smsUrl, input.getValue(), false);
+ }
+ else if(input.getType().equals(DataType.STRING) && !input.isDataStaged()){
+ ApplicationProcessor.addApplicationArgument(value, context, input.getValue());
+ }
+ else if (input.getType().equals(DataType.FLOAT) || input.getType().equals(DataType.INTEGER)){
+ if(! (input.getName().equals(BESConstants.NUMBER_OF_PROCESSES) || input.getName().equals(BESConstants.PROCESSES_PER_HOST))) {
+ // temp avoid environ going to app args
+ ApplicationProcessor.addApplicationArgument(value, context, String.valueOf(input.getValue()));
+ }
+ }
+ }
+ }
+ }
+
+ public static boolean isUnicoreEndpoint(ProcessContext context) {
+ return context.getJobSubmissionProtocol().equals(JobSubmissionProtocol.UNICORE);
+ }
+
+}
[3/3] airavata git commit: Add implementation for BESJobSubmissionTask
Posted by go...@apache.org.
Add implementation for BESJobSubmissionTask
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d231956e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d231956e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d231956e
Branch: refs/heads/feature-workload-mgmt
Commit: d231956e853aca385f6187f5ded5978a21d6548f
Parents: 9f0e45b
Author: Gourav Shenoy <go...@apache.org>
Authored: Tue May 2 13:50:17 2017 -0400
Committer: Gourav Shenoy <go...@apache.org>
Committed: Tue May 2 13:50:17 2017 -0400
----------------------------------------------------------------------
modules/worker/task-jobsubmission/pom.xml | 33 ++
.../impl/BESJobSubmissionTask.java | 83 +--
.../jobsubmission/utils/bes/ActivityInfo.java | 50 ++
.../utils/bes/ApplicationProcessor.java | 221 ++++++++
.../jobsubmission/utils/bes/BESConstants.java | 45 ++
.../utils/bes/DataTransferrer.java | 328 ++++++++++++
.../jobsubmission/utils/bes/FileDownloader.java | 255 +++++++++
.../utils/bes/FileTransferBase.java | 223 ++++++++
.../jobsubmission/utils/bes/FileUploader.java | 242 +++++++++
.../jobsubmission/utils/bes/JSDLGenerator.java | 115 ++++
.../task/jobsubmission/utils/bes/JSDLUtils.java | 517 ++++++++++++++++++
.../task/jobsubmission/utils/bes/Mode.java | 45 ++
.../jobsubmission/utils/bes/MyProxyLogon.java | 465 ++++++++++++++++
.../task/jobsubmission/utils/bes/OSType.java | 124 +++++
.../utils/bes/ProcessorRequirement.java | 61 +++
.../jobsubmission/utils/bes/RangeValueType.java | 271 ++++++++++
.../utils/bes/ResourceProcessor.java | 97 ++++
.../utils/bes/ResourceRequirement.java | 34 ++
.../jobsubmission/utils/bes/SPMDVariations.java | 52 ++
.../jobsubmission/utils/bes/SecurityUtils.java | 160 ++++++
.../jobsubmission/utils/bes/StorageCreator.java | 207 ++++++++
.../utils/bes/UASDataStagingProcessor.java | 182 +++++++
.../utils/bes/UNICORESecurityContext.java | 195 +++++++
.../task/jobsubmission/utils/bes/URIUtils.java | 121 +++++
.../utils/bes/X509SecurityContext.java | 340 ++++++++++++
modules/worker/worker-core/pom.xml | 6 +
.../airavata/worker/core/RequestData.java | 149 ++++++
.../airavata/worker/core/SecurityContext.java | 24 +
.../core/context/AbstractSecurityContext.java | 57 ++
.../airavata/worker/core/utils/SSHUtils.java | 524 +++++++++++++++++++
.../worker/core/utils/WorkerFactory.java | 12 +
.../airavata/worker/core/utils/WorkerUtils.java | 64 +++
32 files changed, 5262 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/pom.xml b/modules/worker/task-jobsubmission/pom.xml
index 7d9506e..45d720e 100644
--- a/modules/worker/task-jobsubmission/pom.xml
+++ b/modules/worker/task-jobsubmission/pom.xml
@@ -41,6 +41,39 @@
<artifactId>aurora-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>eu.unicore</groupId>
+ <artifactId>unicore-client-wrapper</artifactId>
+ <version>1.7.2_1</version>
+ <exclusions>
+ <!-- <exclusion>
+ <groupId>org.apache.santuario</groupId>
+ <artifactId>xmlsec</artifactId>
+ </exclusion> -->
+ <exclusion>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>saxon</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>saxon-dom</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>saxon-xpath</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.saxon</groupId>
+ <artifactId>Saxon-HE</artifactId>
+ <version>9.6.0-1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>3.1</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
index 2c6b984..65668c0 100644
--- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
@@ -31,17 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
import eu.unicore.util.httpclient.DefaultClientConfiguration;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
@@ -59,7 +48,21 @@ import org.apache.airavata.model.task.TaskTypes;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.SSHUtils;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.bes.*;
import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,7 +108,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
// con't reuse if UserDN has been changed.
secProperties = getSecurityConfig(processContext);
// try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations.
- } catch (GFacException e) {
+ } catch (WorkerException e) {
String msg = "Unicorn security context initialization error";
log.error(msg, e);
taskStatus.setState(TaskState.FAILED);
@@ -115,10 +118,10 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
try {
JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
- JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext);
+ JobSubmissionInterface jobSubmissionInterface = JobSubmissionUtils.getPreferredJobSubmissionInterface(processContext);
String factoryUrl = null;
if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
- UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(
+ UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission(
jobSubmissionInterface.getJobSubmissionInterfaceId());
factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
}
@@ -167,8 +170,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
jobDetails.setJobDescription(activityEpr.toString());
jobDetails.setJobStatuses(Arrays.asList(new JobStatus(JobState.SUBMITTED)));
processContext.setJobModel(jobDetails);
- GFacUtils.saveJobModel(processContext, jobDetails);
- GFacUtils.saveJobStatus(processContext, jobDetails);
+ JobSubmissionUtils.saveJobModel(processContext, jobDetails);
+ WorkerUtils.saveJobStatus(processContext, jobDetails);
log.info(formatStatusMessage(activityEpr.getAddress()
.getStringValue(), factory.getActivityStatus(activityEpr)
.toString()));
@@ -205,8 +208,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
} else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
JobState applicationJobStatus = JobState.CANCELED;
jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
- GFacUtils.saveJobStatus(processContext, jobDetails);
- throw new GFacException(
+ WorkerUtils.saveJobStatus(processContext, jobDetails);
+ throw new WorkerException(
processContext.getExperimentId() + "Job Canceled");
} else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
try {
@@ -215,7 +218,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
}
JobState applicationJobStatus = JobState.COMPLETE;
jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus)));
- GFacUtils.saveJobStatus(processContext, jobDetails);
+ WorkerUtils.saveJobStatus(processContext, jobDetails);
log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(),
activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString());
@@ -228,7 +231,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
if (copyOutput != null) {
copyOutputFilesToStorage(taskContext, copyOutput);
for (OutputDataObjectType outputDataObjectType : copyOutput) {
- GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
+ WorkerUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue());
}
}
// dt.publishFinalOutputs();
@@ -244,13 +247,13 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
return taskStatus;
}
- private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException {
+ private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws WorkerException {
ProcessContext pc = taskContext.getParentProcessContext();
String remoteFilePath = null, fileName = null, localFilePath = null;
try {
- authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc);
ServerInfo serverInfo = pc.getComputeResourceServerInfo();
- Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
for (OutputDataObjectType output : copyOutput) {
switch (output.getType()) {
case STDERR: case STDOUT: case STRING: case URI:
@@ -259,7 +262,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length());
}
fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
- URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+ URI destinationURI = WorkerUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
remoteFilePath = destinationURI.getPath();
log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath);
SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession);
@@ -271,18 +274,18 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
}
} catch (IOException | JSchException | SSHApiException | URISyntaxException | CredentialStoreException e) {
log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
- throw new GFacException("Error while scp output files to remote storage file location", e);
+ throw new WorkerException("Error while scp output files to remote storage file location", e);
}
}
- private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException {
+ private void copyInputFilesToLocal(TaskContext taskContext) throws WorkerException {
ProcessContext pc = taskContext.getParentProcessContext();
StorageResourceDescription storageResource = pc.getStorageResource();
if (storageResource != null) {
hostName = storageResource.getHostName();
} else {
- throw new GFacException("Storage Resource is null");
+ throw new WorkerException("Storage Resource is null");
}
inputPath = pc.getStorageFileSystemRootLocation();
inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator);
@@ -290,9 +293,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
String remoteFilePath = null, fileName = null, localFilePath = null;
URI remoteFileURI = null;
try {
- authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+ authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc);
ServerInfo serverInfo = pc.getStorageResourceServerInfo();
- Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
+ Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs();
for (InputDataObjectType input : processInputs) {
@@ -308,11 +311,11 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
}
} catch (IOException | JSchException | SSHApiException | URISyntaxException e) {
log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
- throw new GFacException("Error while scp input files to local file location", e);
+ throw new WorkerException("Error while scp input files to local file location", e);
} catch (CredentialStoreException e) {
String msg = "Authentication issue, make sure you are passing valid credential token";
log.error(msg, e);
- throw new GFacException(msg, e);
+ throw new WorkerException(msg, e);
}
}
@@ -324,7 +327,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
processContext.setOutputDir(localPath);
}
- private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException {
+ private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws WorkerException {
DefaultClientConfiguration clientConfig = null;
try {
UNICORESecurityContext unicoreSecurityContext = SecurityUtils.getSecurityContext(pc);
@@ -339,9 +342,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
clientConfig = unicoreSecurityContext.getDefaultConfiguration(false);
}
} catch (RegistryException e) {
- throw new GFacException("Error! reading user configuration data from registry", e);
+ throw new WorkerException("Error! reading user configuration data from registry", e);
} catch (ApplicationSettingsException e) {
- throw new GFacException("Error! retrieving default client configurations", e);
+ throw new WorkerException("Error! retrieving default client configurations", e);
}
return clientConfig;
@@ -380,8 +383,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
}
}
- private void sendNotification(ProcessContext processContext, JobModel jobModel) throws GFacException {
- GFacUtils.saveJobStatus(processContext, jobModel);
+ private void sendNotification(ProcessContext processContext, JobModel jobModel) throws WorkerException {
+ WorkerUtils.saveJobStatus(processContext, jobModel);
}
@Override
@@ -467,9 +470,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
* EndpointReference need to be saved to make cancel work.
*
* @param processContext
- * @throws GFacException
+ * @throws WorkerException
*/
- public boolean cancelJob(ProcessContext processContext) throws GFacException {
+ public boolean cancelJob(ProcessContext processContext) throws WorkerException {
try {
String activityEpr = processContext.getJobModel().getJobDescription();
// initSecurityProperties(processContext);
@@ -479,7 +482,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
String factoryUrl = null;
if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
- UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+ UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission(interfaceId);
factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
}
EndpointReferenceType epr = EndpointReferenceType.Factory
@@ -490,7 +493,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
factory.terminateActivity(eprt);
return true;
} catch (Exception e) {
- throw new GFacException(e.getLocalizedMessage(), e);
+ throw new WorkerException(e.getLocalizedMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
new file mode 100644
index 0000000..22cf4db
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
@@ -0,0 +1,50 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.io.Serializable;
+
+public class ActivityInfo implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+
+ private EndpointReferenceType activityEPR;
+
+ private ActivityStatusType activityStatusDoc;
+
+
+ public EndpointReferenceType getActivityEPR() {
+ return activityEPR;
+ }
+ public void setActivityEPR(EndpointReferenceType activityEPR) {
+ this.activityEPR = activityEPR;
+ }
+ public ActivityStatusType getActivityStatus() {
+ return activityStatusDoc;
+ }
+ public void setActivityStatusDoc(ActivityStatusType activityStatusDoc) {
+ this.activityStatusDoc = activityStatusDoc;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
new file mode 100644
index 0000000..7fb442f
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
@@ -0,0 +1,221 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+public class ApplicationProcessor {
+
+ public static void generateJobSpecificAppElements(JobDefinitionType value, ProcessContext context){
+
+ String userName = getUserNameFromContext(context);
+// if (userName.equalsIgnoreCase("admin")){
+// userName = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+// }
+
+ ApplicationDeploymentDescription appDep= context.getApplicationDeploymentDescription();
+ String appname = context.getApplicationInterfaceDescription().getApplicationName();
+ ApplicationParallelismType parallelism = appDep.getParallelism();
+ ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+ appType.setApplicationName(appname);
+
+
+// if (appDep.getSetEnvironment().size() > 0) {
+// createApplicationEnvironment(value, appDep.getSetEnvironment(), parallelism);
+// }
+//
+
+ String stdout = context.getStdoutLocation();
+ String stderr = context.getStderrLocation();
+ if(stdout != null) {
+ stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+ }
+
+ if(stderr != null) {
+ stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+ }
+
+ stdout = (stdout == null || stdout.equals("")) ? "stdout":stdout;
+ stderr = (stdout == null || stderr.equals("")) ? "stderr":stderr;
+
+ if (appDep.getExecutablePath() != null) {
+ FileNameType fNameType = FileNameType.Factory.newInstance();
+ fNameType.setStringValue(appDep.getExecutablePath());
+ if(isParallelJob(context)) {
+ JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType);
+ if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){
+ JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value());
+ }else if (parallelism.equals(ApplicationParallelismType.MPI)){
+ JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value());
+ }
+
+ // setting number of processes
+ try {
+ String np = getInputAsString(context, BESConstants.NUMBER_OF_PROCESSES);
+ if((np != null) && (Integer.parseInt(np) > 0)){
+ NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance();
+ num.setStringValue(np);
+ JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num);
+ }
+
+ }catch(RuntimeException np) {
+ // do nothing
+ }
+
+
+ try {
+ // setting processes per host
+ String pphost = getInputAsString(context, BESConstants.PROCESSES_PER_HOST);
+ if((pphost != null) && (Integer.parseInt(pphost) > 0)){
+ ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance();
+ pph.setStringValue(String.valueOf(pphost));
+ JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
+ }
+ }catch(RuntimeException np) {
+ // do nothing
+ }
+
+ int totalThreadCount = context.getProcessModel().getProcessResourceSchedule().getNumberOfThreads();
+ // we take it as threads per processes
+ if(totalThreadCount > 0){
+ ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance();
+ tpp.setStringValue(String.valueOf(totalThreadCount));
+ JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp);
+ }
+
+ if(userName != null) {
+ UserNameType userNameType = UserNameType.Factory.newInstance();
+ userNameType.setStringValue(userName);
+ JSDLUtils.getSPMDApplication(value).setUserName(userNameType);
+ }
+ if (stdout != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stdout);
+ JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
+ }
+ if (stderr != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
+ }
+
+
+ }
+ else {
+ JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType);
+ if(userName != null) {
+ UserNameType userNameType = UserNameType.Factory.newInstance();
+ userNameType.setStringValue(userName);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType);
+ }
+ if (stdout != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stdout);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
+ }
+ if (stderr != null){
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
+ }
+ }
+ }
+ }
+
+ public static String getUserNameFromContext(ProcessContext jobContext) {
+ if(jobContext.getProcessModel() == null)
+ return null;
+ //TODO: Extend unicore model to specify optional unix user id (allocation account)
+ return "admin";
+ }
+
+ public static void addApplicationArgument(JobDefinitionType value, ProcessContext context, String stringPrm) {
+ if(isParallelJob(context)){
+ JSDLUtils.getOrCreateSPMDApplication(value).addNewArgument().setStringValue(stringPrm);
+ }
+ else {
+ JSDLUtils.getOrCreatePOSIXApplication(value).addNewArgument().setStringValue(stringPrm);
+ }
+ }
+
+ public static String getApplicationStdOut(JobDefinitionType value, ProcessContext context) throws RuntimeException {
+ if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue();
+ else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue();
+ }
+
+ public static String getApplicationStdErr(JobDefinitionType value, ProcessContext context) throws RuntimeException {
+ if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getError().getStringValue();
+ else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue();
+ }
+
+ public static void createGenericApplication(JobDefinitionType value, String appName) {
+ ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+ appType.setApplicationName(appName);
+ }
+
+ public static boolean isParallelJob(ProcessContext context) {
+
+ ApplicationDeploymentDescription appDep = context.getApplicationDeploymentDescription();
+ ApplicationParallelismType parallelism = appDep.getParallelism();
+
+ boolean isParallel = false;
+
+ if(parallelism.equals(ApplicationParallelismType.MPI) ||
+ parallelism.equals(ApplicationParallelismType.OPENMP_MPI) ||
+ parallelism.equals(ApplicationParallelismType.OPENMP )) {
+ isParallel = true;
+ }
+
+ return isParallel;
+ }
+
+ private static String getInputAsString(ProcessContext context, String name) {
+ List<InputDataObjectType> inputList = context.getProcessModel().getProcessInputs();
+ String value = null;
+ for (Iterator<InputDataObjectType> iterator = inputList.iterator(); iterator.hasNext();) {
+ InputDataObjectType inputDataObjectType = iterator
+ .next();
+ if (inputDataObjectType.getName().equals(name)) {
+ value = inputDataObjectType.getValue();
+ break;
+ }
+ }
+ return value;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
new file mode 100644
index 0000000..5f3991e
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
@@ -0,0 +1,45 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public interface BESConstants {
+
+ public static final String PROP_SMS_EPR = "unicore.sms.epr";
+
+ public static final String PROP_BES_URL = "bes.factory.url";
+
+ public static final String PROP_ACTIVITY_INFO = "bes.activity.info";
+
+ public static final String PROP_CLIENT_CONF = "bes.client.config";
+
+ public static final String PROP_CA_CERT_PATH = "bes.ca.cert.path";
+
+ public static final String PROP_CA_KEY_PATH = "bes.ca.key.path";
+
+ public static final String PROP_CA_KEY_PASS = "bes.ca.key.pass";
+
+ public static final String NUMBER_OF_PROCESSES = "NumberOfProcesses";
+
+ public static final String PROCESSES_PER_HOST = "ProcessesPerHost";
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
new file mode 100644
index 0000000..736d982
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
@@ -0,0 +1,328 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Data movement utility class for transferring files before and after the job execution phase.
+ *
+ * */
+public class DataTransferrer {
+
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ protected ProcessContext processContext;
+
+ protected StorageClient storageClient;
+
+ protected List<OutputDataObjectType> resultantOutputsLst;
+
+ protected String gatewayDownloadLocation, stdoutLocation, stderrLocation;
+
+ public DataTransferrer(ProcessContext processContext, StorageClient storageClient) {
+ this.processContext = processContext;
+ this.storageClient = storageClient;
+ resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+ initStdoutsLocation();
+ }
+
+ private void initStdoutsLocation() {
+
+ gatewayDownloadLocation = getDownloadLocation();
+
+ String stdout = processContext.getStdoutLocation();
+ String stderr = processContext.getStderrLocation();
+
+ if(stdout != null) {
+ stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+ }
+
+ if(stderr != null) {
+ stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+ }
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+
+ stdoutLocation = gatewayDownloadLocation+File.separator+stdoutFileName;
+
+ stderrLocation = gatewayDownloadLocation+File.separator+stderrFileName;
+
+ List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+ if (processOutputs != null && !processOutputs.isEmpty()){
+ for (OutputDataObjectType processOutput : processOutputs){
+ if (processOutput.getType().equals(DataType.STDOUT)){
+ processOutput.setValue(stdoutLocation);
+ }
+ if (processOutput.getType().equals(DataType.STDERR)){
+ processOutput.setValue(stderrLocation);
+ }
+
+ }
+ }
+ }
+
+ public void uploadLocalFiles() throws WorkerException {
+ List<String> inFilePrms = new ArrayList<>();
+ // FIXME - remove hard coded file path.
+ inFilePrms.addAll(extractInFileParams());
+// inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
+ for (String uri : inFilePrms) {
+ String fileName = new File(uri).getName();
+ if (uri.startsWith("file")) {
+ try {
+ String uriWithoutProtocol = uri.substring(uri.lastIndexOf("://") + 2, uri.length());
+ FileUploader fileUploader = new FileUploader(uriWithoutProtocol, fileName, Mode.overwrite, false);
+ log.info("Uploading file {}", fileName);
+ fileUploader.perform(storageClient);
+ } catch (FileNotFoundException e3) {
+ throw new WorkerException(
+ "Error while staging-in, local file "+fileName+" not found", e3);
+ } catch (Exception e) {
+ throw new WorkerException("Cannot upload files", e);
+
+ }
+
+ }
+ }
+ }
+
+ public List<String> extractInFileParams() {
+ List<String> filePrmsList = new ArrayList<String>();
+ List<InputDataObjectType> applicationInputs = processContext.getProcessModel().getProcessInputs();
+ if (applicationInputs != null && !applicationInputs.isEmpty()){
+ for (InputDataObjectType output : applicationInputs){
+ if(output.getType().equals(DataType.URI)) {
+ filePrmsList.add(output.getValue());
+ }
+ }
+ }
+ return filePrmsList;
+ }
+
+ public void setStorageClient(StorageClient sc){
+ storageClient = sc;
+ }
+
+ public void downloadStdOuts() throws WorkerException{
+
+ String stdoutFileName = new File(stdoutLocation).getName();
+
+ String stderrFileName = new File(stderrLocation).getName();
+
+ FileDownloader f1 = null;
+ log.info("Downloading stdout and stderr..");
+ log.info(stdoutFileName + " -> " + stdoutLocation);
+
+ f1 = new FileDownloader(stdoutFileName, stdoutLocation, Mode.overwrite);
+ try {
+ f1.perform(storageClient);
+// String stdoutput = readFile(stdoutLocation);
+ } catch (Exception e) {
+ log.error("Error while downloading " + stdoutFileName + " to location " + stdoutLocation, e);
+ }
+
+ log.info(stderrFileName + " -> " + stderrLocation);
+ f1.setFrom(stderrFileName);
+ f1.setTo(stderrLocation);
+ try {
+ f1.perform(storageClient);
+// String stderror = readFile(stderrLocation);
+ } catch (Exception e) {
+ log.error("Error while downloading " + stderrFileName + " to location " + stderrLocation);
+ }
+ String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+ String scriptCodeLocation = gatewayDownloadLocation + File.separator + scriptExitCodeFName;
+ if (UASDataStagingProcessor.isUnicoreEndpoint(processContext)) {
+ f1.setFrom(scriptExitCodeFName);
+ f1.setTo(scriptCodeLocation);
+ try {
+ f1.perform(storageClient);
+ OutputDataObjectType output = new OutputDataObjectType();
+ output.setName(scriptExitCodeFName);
+ output.setValue(scriptCodeLocation);
+ output.setType(DataType.URI);
+ output.setIsRequired(true);
+ processContext.getProcessModel().getProcessOutputs().add(output);
+ log.info("UNICORE_SCRIPT_EXIT_CODE -> " + scriptCodeLocation);
+ log.info("EXIT CODE: " + readFile(scriptCodeLocation));
+ } catch (Exception e) {
+ log.error("Error downloading file " + scriptExitCodeFName + " to location " + scriptCodeLocation, e);
+ }
+ }
+ }
+
+ private String readFile(String localFile) throws IOException {
+ BufferedReader instream = new BufferedReader(new FileReader(localFile));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(Constants.NEWLINE);
+ }
+
+ log.info("finish read file:" + localFile);
+
+ return buff.toString();
+ }
+
+ private String getDownloadLocation() {
+ ProcessModel processModel = processContext.getProcessModel();
+ String outputDataDir = "";
+
+ if (processContext.getOutputDir() != null ) {
+
+ outputDataDir = processContext.getOutputDir();
+
+
+ if ("".equals(outputDataDir)) {
+ outputDataDir = getTempPath();
+ }
+
+ else {
+
+ // in case of remote locations use the tmp location
+ if (outputDataDir.startsWith("scp:") ||
+ outputDataDir.startsWith("ftp:") ||
+ outputDataDir.startsWith("gsiftp:")) {
+ outputDataDir = getTempPath();
+ } else if ( outputDataDir.startsWith("file:") &&
+ outputDataDir.contains("@")){
+ outputDataDir = getTempPath();
+
+ } else {
+ try {
+ URI u = new URI(outputDataDir);
+ outputDataDir = u.getPath();
+ } catch (URISyntaxException e) {
+ outputDataDir = getTempPath();
+ }
+ }
+ }
+ }
+
+ File file = new File(outputDataDir);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
+
+ return outputDataDir;
+ }
+
+ private String getTempPath() {
+ String tmpOutputDir = File.separator + "tmp" + File.separator
+ + processContext.getProcessId();
+ (new File(tmpOutputDir)).mkdirs();
+ return tmpOutputDir;
+ }
+
+ public List<OutputDataObjectType> downloadRemoteFiles() throws WorkerException {
+
+ if(log.isDebugEnabled()) {
+ log.debug("Download location is:" + gatewayDownloadLocation);
+ }
+
+ List<OutputDataObjectType> applicationOutputs = processContext.getProcessModel().getProcessOutputs();
+ if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+ for (OutputDataObjectType output : applicationOutputs){
+ if("".equals(output.getValue()) || output.getValue() == null) {
+ continue;
+ }
+ if(output.getType().equals(DataType.STDOUT)) {
+ output.setValue(stdoutLocation);
+ resultantOutputsLst.add(output);
+ } else if(output.getType().equals(DataType.STDERR)) {
+ output.setValue(stderrLocation);
+ resultantOutputsLst.add(output);
+ } else if (output.getType().equals(DataType.URI)) {
+ String value = null;
+ if (!output.getLocation().isEmpty()) {
+ value = output.getLocation() + File.separator + output.getValue();
+ } else {
+ value = output.getValue();
+ }
+ String outputPath = gatewayDownloadLocation + File.separator + output.getValue();
+ File f = new File(gatewayDownloadLocation);
+ if (!f.exists())
+ f.mkdirs();
+
+ FileDownloader fileDownloader = new FileDownloader(value, outputPath, Mode.overwrite);
+ try {
+ log.info("Downloading file {}", value);
+ fileDownloader.perform(storageClient);
+ output.setType(DataType.URI);
+ output.setValue(outputPath);
+ resultantOutputsLst.add(output);
+ } catch (Exception e) {
+ log.error("Error downloading " + value + " from job working directory. ");
+// throw new WorkerException(e.getLocalizedMessage(),e);
+ }
+ } else {
+ log.info("Ignore output file {}, type {}", output.getValue(), output.getType().toString());
+ }
+
+ }
+
+ }
+
+ downloadStdOuts();
+ return resultantOutputsLst;
+
+ }
+
+ public void publishFinalOutputs() throws WorkerException {
+ try {
+ if(!resultantOutputsLst.isEmpty()) {
+ log.debug("Publishing the list of outputs to the registry instance..");
+ ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+ experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, processContext.getExperimentId());
+ }
+ } catch (RegistryException e) {
+ throw new WorkerException("Cannot publish outputs to the registry.");
+ }
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
new file mode 100644
index 0000000..937b1c1
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * helper that exports remote files from a UNICORE Storage
+ * to the local client machine.<br/>
+ * Simple wildcards ("*" and "?") and download of
+ * directories are supported.
+ *
+ * TODO this should be refactored so the single-file download logic
+ * is separated from the wildcard/directory/provided outputStream logic
+ *
+ * @author schuller
+ */
+public class FileDownloader extends FileTransferBase{
+
+ private boolean showProgress=true;
+
+ private boolean forceFileOnly=false;
+
+ private OutputStream targetStream=null;
+
+ public FileDownloader(String from, String to, Mode mode){
+ this(from,to,mode,true);
+ }
+
+ public FileDownloader(String from, String to, Mode mode, boolean failOnError){
+ this.to=to;
+ this.from=from;
+ this.mode=mode;
+ this.failOnError=failOnError;
+ }
+
+ public void perform(StorageClient sms)throws Exception{
+ boolean isWildcard=hasWildCards(from);
+ boolean isDirectory=false;
+ GridFileType gridSource=null;
+ if(isWildcard){
+ performWildCardExport(sms);
+ }
+ else {
+ //check if source is a directory
+ gridSource=sms.listProperties(from);
+ isDirectory=gridSource.getIsDirectory();
+ if(isDirectory){
+ if(forceFileOnly){
+ throw new IOException("Source is a directory");
+ }
+ performDirectoryExport(gridSource, new File(to), sms);
+ }
+ else{
+ download(gridSource,new File(to),sms);
+ }
+ }
+ }
+
+ protected void performDirectoryExport(GridFileType directory, File targetDirectory, StorageClient sms)throws Exception{
+ if(!targetDirectory.exists()|| !targetDirectory.canWrite()){
+ throw new IOException("Target directory <"+to+"> does not exist or is not writable!");
+ }
+ if(!targetDirectory.isDirectory()){
+ throw new IOException("Target <"+to+"> is not a directory!");
+ }
+ GridFileType[]gridFiles=sms.listDirectory(directory.getPath());
+ for(GridFileType file: gridFiles){
+ if(file.getIsDirectory()){
+ if(!recurse) {
+ System.out.println("Skipping directory "+file.getPath());
+ continue;
+ }
+ else{
+ File newTargetDirectory=new File(targetDirectory,getName(file.getPath()));
+ boolean success=newTargetDirectory.mkdirs();
+ if(!success)throw new IOException("Can create directory: "+newTargetDirectory.getAbsolutePath());
+ performDirectoryExport(file, newTargetDirectory, sms);
+ continue;
+ }
+ }
+ download(file, new File(targetDirectory,getName(file.getPath())), sms);
+ }
+ }
+
+ protected void performWildCardExport(StorageClient sms)throws Exception{
+ String dir=getDir(from);
+ if(dir==null)dir="/";
+ GridFileType[] files=sms.find(dir, false, from, false, null, null);
+ File targetDir=targetStream==null?new File(to):null;
+ if(targetStream==null){
+ if(!targetDir.isDirectory())throw new IOException("Target is not a directory.");
+ }
+ for(GridFileType f: files){
+ download(f, targetDir, sms);
+ }
+ }
+
+ private String getDir(String path){
+ return new File(path).getParent();
+ }
+
+ private String getName(String path){
+ return new File(path).getName();
+ }
+
+ /**
+ * download a single regular file
+ *
+ * @param source - grid file descriptor
+ * @param localFile - local file or directory to write to
+ * @param sms
+ * @throws Exception
+ */
+ private void download(GridFileType source, File localFile, StorageClient sms)throws Exception{
+ if(source==null || source.getIsDirectory()){
+ throw new IllegalStateException("Source="+source);
+ }
+
+ OutputStream os=targetStream!=null?targetStream:null;
+ FileTransferClient ftc=null;
+ try{
+ String path=source.getPath();
+ if(targetStream==null){
+ if(localFile.isDirectory()){
+ localFile=new File(localFile,getName(source.getPath()));
+ }
+ if(mode.equals(Mode.nooverwrite) && localFile.exists()){
+ System.out.println("File exists and creation mode was set to 'nooverwrite'.");
+ return;
+ }
+ System.out.println("Downloading remote file '"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath());
+ os=new FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append));
+ }
+
+ chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+ Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+ ftc=sms.getExport(path,extraParameters,chosenProtocol);
+ configure(ftc, extraParameters);
+ System.out.println("DEB:File transfer URL : "+ftc.getUrl());
+// ProgressBar p=null;
+ if(ftc instanceof IMonitorable && showProgress){
+ long size=ftc.getSourceFileSize();
+ if(isRange()){
+ size=getRangeSize();
+ }
+// p=new ProgressBar(localFile.getName(),size,msg);
+// ((IMonitorable) ftc).setProgressListener(p);
+ }
+ long startTime=System.currentTimeMillis();
+ if(isRange()){
+ if(!(ftc instanceof SupportsPartialRead)){
+ throw new Exception("Byte range is defined but protocol does not allow " +
+ "partial read! Please choose a different protocol!");
+ }
+ System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+ SupportsPartialRead pReader=(SupportsPartialRead)ftc;
+ pReader.readPartial(startByte, endByte-startByte+1, os);
+ }
+ else{
+ ftc.readAllData(os);
+ }
+// if(p!=null){
+// p.finish();
+// }
+ if(timing){
+ long duration=System.currentTimeMillis()-startTime;
+ double rate=(double)localFile.length()/(double)duration;
+ System.out.println("Rate: " +rate+ " kB/sec.");
+ }
+ if(targetStream==null)copyProperties(source, localFile);
+ }
+ finally{
+ try{
+ if(targetStream==null && os!=null){
+ os.close();
+ }
+ }catch(Exception ignored){}
+ if(ftc!=null){
+ try{
+ ftc.destroy();
+ }catch(Exception e1){
+// System.out.println("Could not destroy the filetransfer client",e1);
+ }
+ }
+ }
+ }
+
+ /**
+ * if possible, copy the remote executable flag to the local file
+ * @throws Exception
+ */
+ private void copyProperties(GridFileType source, File localFile)throws Exception{
+ try{
+ localFile.setExecutable(source.getPermissions().getExecutable());
+ }
+ catch(Exception ex){
+ //TODO: logging
+// ("Can't set 'executable' flag for "+localFile.getName(), ex);
+ }
+ }
+
+ private void configure(FileTransferClient ftc, Map<String,String>params){
+ if(ftc instanceof UFTPFileTransferClient){
+ UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+ String secret=params.get(UFTPConstants.PARAM_SECRET);
+ u.setSecret(secret);
+ }
+ }
+
+ public void setShowProgress(boolean showProgress) {
+ this.showProgress = showProgress;
+ }
+
+ public void setForceFileOnly(boolean forceFileOnly) {
+ this.forceFileOnly = forceFileOnly;
+ }
+
+ public void setTargetStream(OutputStream targetStream) {
+ this.targetStream = targetStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
new file mode 100644
index 0000000..c76ab74
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.util.PropertyHelper;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.*;
+import java.util.regex.Pattern;
+
+public class FileTransferBase {
+
+ protected Properties extraParameterSource;
+
+ protected boolean timing=false;
+
+ protected boolean recurse=false;
+
+ protected String from;
+
+ protected String to;
+
+ //index of first byte to download
+ protected Long startByte;
+
+ //index of last byte to download
+ protected Long endByte;
+
+ /**
+ * the creation mode
+ */
+ protected Mode mode;
+
+ /**
+ * whether the job processing should fail if an error occurs
+ */
+ protected boolean failOnError;
+
+ protected List<ProtocolType.Enum> preferredProtocols=new ArrayList<ProtocolType.Enum>();
+
+ public FileTransferBase(){
+ preferredProtocols.add(ProtocolType.BFT);
+ }
+
+ protected Map<String,String>makeExtraParameters(ProtocolType.Enum protocol){
+ Map<String, String> res;
+ if(extraParameterSource==null){
+ res=new HashMap<String, String>();
+ }
+ else{
+ String p=String.valueOf(protocol);
+ PropertyHelper ph=new PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()});
+ res= ph.getFilteredMap();
+ }
+ if(res.size()>0){
+ // TODO: change it to logger
+ System.out.println("Have "+res.size()+" extra parameters for protocol "+protocol);
+ }
+ return res;
+ }
+
+
+ public String getTo() {
+ return to;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public boolean isFailOnError() {
+ return failOnError;
+ }
+
+ public boolean isTiming() {
+ return timing;
+ }
+
+ public void setTiming(boolean timing) {
+ this.timing = timing;
+ }
+
+ public void setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ }
+
+ public List<ProtocolType.Enum> getPreferredProtocols() {
+ return preferredProtocols;
+ }
+
+ public void setPreferredProtocols(List<ProtocolType.Enum> preferredProtocols) {
+ this.preferredProtocols = preferredProtocols;
+ }
+
+ public void setExtraParameterSource(Properties properties){
+ this.extraParameterSource=properties;
+ }
+
+ public void setRecurse(boolean recurse) {
+ this.recurse = recurse;
+ }
+ /**
+ * check if the given path denotes a valid remote directory
+ * @param remotePath - the path
+ * @param sms - the storage
+ * @return <code>true</code> if the remote directory exists and is a directory
+ */
+ protected boolean isValidDirectory(String remotePath, StorageClient sms){
+ boolean result=false;
+ if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){
+ try{
+ GridFileType gft=sms.listProperties(remotePath);
+ result=gft.getIsDirectory();
+ }catch(Exception ex){
+ result=false;
+ }
+ }
+ else result=true;
+
+ return result;
+ }
+
+ public File[] resolveWildCards(File original){
+ final String name=original.getName();
+ if(!hasWildCards(original))return new File[]{original};
+ File parent=original.getParentFile();
+ if(parent==null)parent=new File(".");
+ FilenameFilter filter=new FilenameFilter(){
+ Pattern p=createPattern(name);
+ public boolean accept(File file, String name){
+ return p.matcher(name).matches();
+ }
+ };
+ return parent.listFiles(filter);
+ }
+
+ protected boolean hasWildCards(File file){
+ return hasWildCards(file.getName());
+ }
+
+ public boolean hasWildCards(String name){
+ return name.contains("*") || name.contains("?");
+ }
+
+ private Pattern createPattern(String nameWithWildcards){
+ String regex=nameWithWildcards.replace("?",".").replace("*", ".*");
+ return Pattern.compile(regex);
+ }
+
+ protected ProtocolType.Enum chosenProtocol=null;
+
+ public ProtocolType.Enum getChosenProtocol(){
+ return chosenProtocol;
+ }
+
+ public Long getStartByte() {
+ return startByte;
+ }
+
+ public void setStartByte(Long startByte) {
+ this.startByte = startByte;
+ }
+
+ public Long getEndByte() {
+ return endByte;
+ }
+
+ public void setEndByte(Long endByte) {
+ this.endByte = endByte;
+ }
+
+ /**
+ * checks if a byte range is defined
+ * @return <code>true</code> iff both startByte and endByte are defined
+ */
+ protected boolean isRange(){
+ return startByte!=null && endByte!=null;
+ }
+
+ /**
+ * get the number of bytes in the byte range, or "-1" if the range is open-ended
+ * @return
+ */
+ protected long getRangeSize(){
+ if(Long.MAX_VALUE==endByte)return -1;
+ return endByte-startByte;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
new file mode 100644
index 0000000..d899b37
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * upload local file(s) to a remote location
+ *
+ * @author schuller
+ */
+public class FileUploader extends FileTransferBase{
+
+ public FileUploader(String from, String to, Mode mode)throws FileNotFoundException{
+ this(from,to,mode,true);
+ }
+
+ public FileUploader(String from, String to, Mode mode, boolean failOnError)throws FileNotFoundException{
+ this.to=to;
+ this.from=from;
+ this.mode=mode;
+ this.failOnError=failOnError;
+ checkOK();
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+
+ public void perform(StorageClient sms)throws Exception{
+ File fileSpec=new File(from);
+ boolean hasWildCards=false;
+ boolean isDirectory=fileSpec.isDirectory();
+ File[] fileset=null;
+
+ if(!isDirectory){
+ hasWildCards=hasWildCards(fileSpec);
+ }
+
+ chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+ Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+
+ if(!hasWildCards && !isDirectory){
+ //single regular file
+ uploadFile(fileSpec,to,sms,chosenProtocol,extraParameters);
+ return;
+ }
+
+ //handle wildcards or directory
+ if(hasWildCards){
+ fileset=resolveWildCards(fileSpec);
+ }
+ else{
+ fileset=fileSpec.listFiles();
+ }
+
+ if(!isValidDirectory(to, sms)){
+ throw new IOException("The specified remote target '"+to+"' is not a directory");
+ }
+ if(to==null)to="/";
+ String target=isDirectory?to+"/"+fileSpec.getName():to;
+ sms.createDirectory(target);
+ uploadFiles(fileset,target,sms,chosenProtocol,extraParameters);
+ }
+
+ /**
+ * upload a set of files to a remote directory (which must exist)
+ *
+ * @param files
+ * @param remoteDirectory
+ * @param sms
+ * @param protocol
+ * @param extraParameters
+ * @throws Exception
+ */
+ private void uploadFiles(File[]files, String remoteDirectory, StorageClient sms, ProtocolType.Enum protocol,
+ Map<String,String>extraParameters)throws Exception{
+ for(File localFile: files){
+ String target=remoteDirectory+"/"+localFile.getName();
+ if(localFile.isDirectory()){
+ if(!recurse){
+ System.out.println("Skipping directory "+localFile.getAbsolutePath());
+ }else{
+ File[] fileset=localFile.listFiles();
+ sms.createDirectory(target);
+ uploadFiles(fileset,target,sms,protocol,extraParameters);
+ }
+ }else{
+ uploadFile(localFile,target,sms,protocol,extraParameters);
+ }
+ }
+ }
+
+ /**
+ * uploads a single regular file
+ *
+ * @param localFile
+ * @param remotePath
+ * @param sms
+ * @param protocol
+ * @param extraParameters
+ * @throws Exception
+ */
+ private void uploadFile(File localFile, String remotePath, StorageClient sms, ProtocolType.Enum protocol,
+ Map<String,String>extraParameters) throws Exception{
+ long startTime=System.currentTimeMillis();
+ FileInputStream is=null;
+ FileTransferClient ftc=null;
+ try{
+ if(remotePath==null){
+ remotePath="/"+localFile.getName();
+ }
+ else if(remotePath.endsWith("/")){
+ remotePath+=localFile.getName();
+ }
+ System.out.println("Uploading local file '"+localFile.getAbsolutePath()+"' -> '"+sms.getUrl()+"#"+remotePath+"'");
+ is=new FileInputStream(localFile.getAbsolutePath());
+ boolean append=Mode.append.equals(mode);
+ ftc=sms.getImport(remotePath, append, extraParameters, protocol);
+ configure(ftc, extraParameters);
+ if(append)ftc.setAppend(true);
+ String url=ftc.getUrl();
+ System.out.println("File transfer URL : "+url);
+// ProgressBar p=null;
+ if(ftc instanceof IMonitorable){
+ long size=localFile.length();
+ if(isRange()){
+ size=getRangeSize();
+ }
+// p=new ProgressBar(localFile.getName(),size,msg);
+// ((IMonitorable) ftc).setProgressListener(p);
+ }
+ if(isRange()){
+ System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+ long skipped=0;
+ while(skipped<startByte){
+ skipped+=is.skip(startByte);
+ }
+ ftc.writeAllData(is, endByte-startByte+1);
+
+ }else{
+ ftc.writeAllData(is);
+ }
+ copyProperties(localFile, sms, remotePath);
+
+// if(ftc instanceof IMonitorable){
+// p.finish();
+// }
+
+ }finally{
+ if(ftc!=null){
+ try{
+ ftc.destroy();
+ }catch(Exception e1){
+// msg.error("Could not clean-up the filetransfer at <"+ftc.getUrl()+">",e1);
+ }
+ }
+ try{ if(is!=null)is.close(); }catch(Exception ignored){}
+ }
+ if(timing){
+ long duration=System.currentTimeMillis()-startTime;
+ double rate=(double)localFile.length()/(double)duration;
+ System.out.println("Rate: "+rate+ " kB/sec.");
+ }
+ }
+
+ /**
+ * if possible, copy the local executable flag to the remote file
+ * @param sourceFile - local file
+ * @throws Exception
+ */
+ private void copyProperties(File sourceFile, StorageClient sms, String target)throws Exception{
+ boolean x=sourceFile.canExecute();
+ try{
+ if(x){
+ sms.changePermissions(target, true, true, x);
+ }
+ }catch(Exception ex){
+// System.out.println("Can't set exectuable flag on remote file.",ex);
+ }
+ }
+
+ private void checkOK()throws FileNotFoundException{
+ if(!failOnError){
+ return;
+ }
+ File orig=new File(from);
+ if(!orig.isAbsolute()){
+ orig=new File(System.getProperty("user.dir"),from);
+ }
+ File[] files=resolveWildCards(orig);
+ if(files==null){
+ throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+ }
+ for(File f: files){
+ if(!f.exists())throw new FileNotFoundException("Local import '"+from+"' does not exist.");
+ }
+ }
+
+ private void configure(FileTransferClient ftc, Map<String,String>params){
+ if(ftc instanceof UFTPFileTransferClient){
+ UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+ String secret=params.get(UFTPConstants.PARAM_SECRET);
+ u.setSecret(secret);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
new file mode 100644
index 0000000..de96104
--- /dev/null
+++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ * Utility class generates a JSDL instance from JobExecutionContext instance
+ *
+ * @author shahbaz memon
+ *
+ * */
+
+public class JSDLGenerator implements BESConstants {
+
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context) throws Exception {
+
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+ JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+
+ // build Identification
+ createJobIdentification(value, context);
+
+ ResourceProcessor.generateResourceElements(value, context);
+
+ ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+
+ return jobDefDoc;
+ }
+
+ public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context, String smsUrl) throws Exception {
+
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+ JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+
+ // build Identification
+ createJobIdentification(value, context);
+
+ ResourceProcessor.generateResourceElements(value, context);
+
+ ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+ UASDataStagingProcessor.generateDataStagingElements(value, context, smsUrl);
+
+ return jobDefDoc;
+ }
+
+ public synchronized static JobDefinitionDocument buildJSDLInstance(
+ ProcessContext context, String smsUrl, Object jobDirectoryMode)
+ throws Exception {
+
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+ .newInstance();
+ JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+ // build Identification
+ createJobIdentification(value, context);
+
+ ResourceProcessor.generateResourceElements(value, context);
+
+ ApplicationProcessor.generateJobSpecificAppElements(value, context);
+
+ UASDataStagingProcessor.generateDataStagingElements(value, context,
+ smsUrl);
+
+ return jobDefDoc;
+ }
+
+ private static void createJobIdentification(JobDefinitionType value, ProcessContext context) {
+
+ if (context != null) {
+ if (context.getAllocationProjectNumber() != null)
+ JSDLUtils.addProjectName(value, context.getAllocationProjectNumber());
+
+ if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationDescription() != null)
+ JSDLUtils.getOrCreateJobIdentification(value).setDescription(context.getApplicationInterfaceDescription().getApplicationDescription());
+
+ if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationName() != null)
+ JSDLUtils.getOrCreateJobIdentification(value).setJobName(context.getApplicationInterfaceDescription().getApplicationName());
+ }
+ }
+
+
+}
\ No newline at end of file