You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 16:50:01 UTC
[4/4] git commit: moving gfac-bes classes to new package.
moving gfac-bes classes to new package.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/49b6987f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/49b6987f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/49b6987f
Branch: refs/heads/master
Commit: 49b6987f36dcde718b07fb22cc8055a7178f5035
Parents: 9bb8c2b
Author: lahiru <la...@apache.org>
Authored: Mon May 5 10:49:13 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon May 5 10:49:13 2014 -0400
----------------------------------------------------------------------
.../airavata/gfac/bes/impl/BESProvider.java | 568 +++++++++++++++++++
.../gfac/bes/security/GSISecurityContext.java | 288 ++++++++++
.../gfac/bes/utils/ApplicationProcessor.java | 252 ++++++++
.../gfac/bes/utils/DataStagingProcessor.java | 236 ++++++++
.../gfac/bes/utils/DataTransferrer.java | 241 ++++++++
.../airavata/gfac/bes/utils/FileDownloader.java | 256 +++++++++
.../gfac/bes/utils/FileTransferBase.java | 227 ++++++++
.../airavata/gfac/bes/utils/FileUploader.java | 245 ++++++++
.../airavata/gfac/bes/utils/JSDLGenerator.java | 103 ++++
.../airavata/gfac/bes/utils/JSDLUtils.java | 540 ++++++++++++++++++
.../apache/airavata/gfac/bes/utils/Mode.java | 45 ++
.../airavata/gfac/bes/utils/OSRequirement.java | 108 ++++
.../apache/airavata/gfac/bes/utils/OSType.java | 124 ++++
.../gfac/bes/utils/ProcessorRequirement.java | 61 ++
.../airavata/gfac/bes/utils/RangeValueType.java | 274 +++++++++
.../gfac/bes/utils/ResourceProcessor.java | 152 +++++
.../airavata/gfac/bes/utils/SPMDProcessor.java | 33 ++
.../airavata/gfac/bes/utils/SPMDVariations.java | 52 ++
.../airavata/gfac/bes/utils/StorageCreator.java | 211 +++++++
.../gfac/bes/utils/UASDataStagingProcessor.java | 225 ++++++++
.../airavata/gfac/bes/utils/URIUtils.java | 119 ++++
.../context/security/GSISecurityContext.java | 288 ----------
.../gfac/provider/impl/BESProvider.java | 568 -------------------
.../gfac/utils/ApplicationProcessor.java | 252 --------
.../gfac/utils/DataStagingProcessor.java | 235 --------
.../airavata/gfac/utils/DataTransferrer.java | 241 --------
.../airavata/gfac/utils/FileDownloader.java | 256 ---------
.../airavata/gfac/utils/FileTransferBase.java | 227 --------
.../airavata/gfac/utils/FileUploader.java | 245 --------
.../airavata/gfac/utils/JSDLGenerator.java | 105 ----
.../apache/airavata/gfac/utils/JSDLUtils.java | 540 ------------------
.../org/apache/airavata/gfac/utils/Mode.java | 45 --
.../airavata/gfac/utils/OSRequirement.java | 108 ----
.../org/apache/airavata/gfac/utils/OSType.java | 124 ----
.../gfac/utils/ProcessorRequirement.java | 61 --
.../airavata/gfac/utils/RangeValueType.java | 274 ---------
.../airavata/gfac/utils/ResourceProcessor.java | 152 -----
.../airavata/gfac/utils/SPMDProcessor.java | 33 --
.../airavata/gfac/utils/SPMDVariations.java | 52 --
.../airavata/gfac/utils/StorageCreator.java | 211 -------
.../gfac/utils/UASDataStagingProcessor.java | 225 --------
.../apache/airavata/gfac/utils/URIUtils.java | 119 ----
.../impl/JSDLGeneratorTestWithMyProxyAuth.java | 32 --
.../src/test/resources/gfac-config.xml | 2 +-
44 files changed, 4361 insertions(+), 4394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
new file mode 100644
index 0000000..c41632f
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
@@ -0,0 +1,568 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.bes.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import javax.security.auth.x500.X500Principal;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.bes.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
+import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.bes.utils.DataTransferrer;
+import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
+import org.apache.airavata.gfac.bes.utils.StorageCreator;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.apache.xmlbeans.XmlCursor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration.Enum;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesResponseDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+
+
+
+public class BESProvider extends AbstractProvider {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private DefaultClientConfiguration secProperties;
+
+ private String jobId;
+
+
+
+ public void initialize(JobExecutionContext jobExecutionContext)
+ throws GFacProviderException, GFacException {
+ log.info("Initializing UNICORE Provider");
+ super.initialize(jobExecutionContext);
+ initSecurityProperties(jobExecutionContext);
+ log.debug("initialized security properties");
+ }
+
+
+ public void execute(JobExecutionContext jobExecutionContext)
+ throws GFacProviderException {
+ UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+ .getType();
+
+ String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+ eprt.addNewAddress().setStringValue(factoryUrl);
+
+ String userDN = getUserName(jobExecutionContext);
+
+ if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+ userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ String xlogin = getCNFromUserDN(userDN);
+ // create storage
+ StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, xlogin);
+
+ StorageClient sc = null;
+ try {
+ try {
+ sc = storageCreator.createStorage();
+ } catch (Exception e2) {
+ log.error("Cannot create storage..");
+ throw new GFacProviderException("Cannot create storage..", e2);
+ }
+
+ CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+ JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory.newInstance();
+
+ JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
+ try {
+ jobDefinition = JSDLGenerator.buildJSDLInstance(jobExecutionContext, sc.getUrl()).getJobDefinition();
+ cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+ log.info("JSDL" + jobDefDoc.toString());
+ } catch (Exception e1) {
+ throw new GFacProviderException("Cannot generate JSDL instance from the JobExecutionContext.", e1);
+ }
+
+ // upload files if any
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+ dt.uploadLocalFiles();
+
+ FactoryClient factory = null;
+ try {
+ factory = new FactoryClient(eprt, secProperties);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+
+ CreateActivityResponseDocument response = null;
+ try {
+ log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
+ response = factory.createActivity(cad);
+ log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+ } catch (Exception e) {
+ throw new GFacProviderException("Cannot create activity.", e);
+ }
+ EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+ log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+ // factory.waitWhileActivityIsDone(activityEpr, 1000);
+ jobId = WSUtilities.extractResourceID(activityEpr);
+ if (jobId == null) {
+ jobId = new Long(Calendar.getInstance().getTimeInMillis()).toString();
+ }
+ log.info("JobID: " + jobId);
+ jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+ saveApplicationJob(jobExecutionContext, jobDefinition, activityEpr.toString());
+
+ factory.getActivityStatus(activityEpr);
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(),
+ factory.getActivityStatus(activityEpr).toString()));
+
+ // TODO publish the status messages to the message bus
+ while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+ && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ JobState jobStatus = getApplicationJobStatus(activityStatus);
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+ jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+ details.setJobID(jobId);
+ GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
+ } catch (UnknownActivityIdentifierFault e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
+ ActivityStatusType activityStatus = null;
+ try {
+ activityStatus = getStatus(factory, activityEpr);
+ } catch (UnknownActivityIdentifierFault e) {
+ throw new GFacProviderException(e.getMessage(), e.getCause());
+ }
+
+ log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState()
+ .toString()));
+
+ if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+ String error = activityStatus.getFault().getFaultcode().getLocalPart() + "\n"
+ + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode();
+ log.info(error);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ dt.downloadStdOuts();
+ } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+ String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+ JobState jobStatus = JobState.CANCELED;
+ String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+ jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+ details.setJobID(jobId);
+ try {
+ GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
+ } catch (GFacException e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ throw new GFacProviderException(experimentID + "Job Canceled");
+ }
+
+ else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (activityStatus.getExitCode() == 0) {
+ dt.downloadRemoteFiles();
+ } else {
+ dt.downloadStdOuts();
+ }
+ }
+
+ } catch (UnknownActivityIdentifierFault e1) {
+ throw new GFacProviderException(e1.getLocalizedMessage(), e1);
+ } finally {
+ // destroy sms instance
+ try {
+ if (sc != null) {
+ sc.destroy();
+ }
+ } catch (Exception e) {
+ log.warn("Cannot destroy temporary SMS instance:" + sc.getUrl(), e);
+ }
+ }
+ }
+
+ private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
+ if (activityStatus == null) {
+ return JobState.UNKNOWN;
+ }
+ Enum state = activityStatus.getState();
+ String status = null;
+ XmlCursor acursor = activityStatus.newCursor();
+ try {
+ if (acursor.toFirstChild()) {
+ if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+ status = acursor.getName().getLocalPart();
+ }
+ }
+ if (status != null) {
+ if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
+ || status.equalsIgnoreCase("Ready")) {
+ return JobState.QUEUED;
+ } else if (status.equalsIgnoreCase("Staging-In")) {
+ return JobState.SUBMITTED;
+ } else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
+ return JobState.COMPLETE;
+ } else if (status.equalsIgnoreCase("Executing")) {
+ return JobState.ACTIVE;
+ } else if (status.equalsIgnoreCase("FAILED")) {
+ return JobState.FAILED;
+ } else if (status.equalsIgnoreCase("CANCELLED")) {
+ return JobState.CANCELED;
+ }
+ } else {
+ if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+ return JobState.CANCELED;
+ } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+ return JobState.FAILED;
+ } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+ return JobState.COMPLETE;
+ } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+ return JobState.ACTIVE;
+ }
+ }
+ } finally {
+ if (acursor != null)
+ acursor.dispose();
+ }
+ return JobState.UNKNOWN;
+ }
+
+ private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
+ String metadata) {
+ ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+ appJob.setJobId(jobId);
+ appJob.setJobData(jobDefinition.toString());
+ appJob.setSubmittedTime(Calendar.getInstance().getTime());
+ appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+ appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+ appJob.setMetadata(metadata);
+ GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ secProperties = null;
+ }
+
+ /**
+ * EndpointReference need to be saved to make cancel work.
+ *
+ * @param activityEpr
+ * @param jobExecutionContext
+ * @throws GFacProviderException
+ */
+ public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ try {
+ initSecurityProperties(jobExecutionContext);
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
+ UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+ .getType();
+
+ String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+ EndpointReferenceType epr = EndpointReferenceType.Factory.newInstance();
+ epr.addNewAddress().setStringValue(factoryUrl);
+
+ FactoryClient factory = new FactoryClient(epr, secProperties);
+ factory.terminateActivity(eprt);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+
+ }
+
+ protected void downloadOffline(String smsEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ try {
+ initSecurityProperties(jobExecutionContext);
+ EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(smsEpr);
+ StorageClient sms = new StorageClient(eprt, secProperties);
+ DataTransferrer dt = new DataTransferrer(jobExecutionContext, sms);
+ // there must be output files there
+ // this is also possible if client is re-connected, the jobs are
+ // still
+ // running and no output is produced
+ dt.downloadRemoteFiles();
+
+ // may be use the below method before downloading for checking
+ // the number of entries
+ // sms.listDirectory(".");
+
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacProviderException,
+ GFacException {
+
+ if (secProperties != null)
+ return;
+
+ GSISecurityContext gssContext = (GSISecurityContext) jobExecutionContext
+ .getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT);
+
+ try {
+ String certLocation = gssContext.getTrustedCertificatePath();
+ List<String> trustedCert = new ArrayList<String>();
+ trustedCert.add(certLocation + "/*.0");
+ trustedCert.add(certLocation + "/*.pem");
+
+ DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1,
+ 60000, null);
+
+ String userID = getUserName(jobExecutionContext);
+
+ if ( userID == null || "".equals(userID) || userID.equalsIgnoreCase("admin") ) {
+ userID = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ String userDN = userID.replaceAll("^\"|\"$", "");
+
+ // TODO: should be changed to default airavata server locations
+ KeyAndCertCredential cred = generateShortLivedCertificate(userDN, certLocation
+ + "/cacert.pem", certLocation
+ + "/cakey.pem", "ultrascan3");
+ secProperties = new DefaultClientConfiguration(dcValidator, cred);
+
+ // secProperties.doSSLAuthn();
+ secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+ secProperties.setDoSignMessage(true);
+
+ String[] outHandlers = secProperties.getOutHandlerClassNames();
+
+ Set<String> outHandlerLst = null;
+
+ // timeout in milliseconds
+ Properties p = secProperties.getExtraSettings();
+ p.setProperty("http.connection.timeout", "300000");
+ p.setProperty("http.socket.timeout", "300000");
+
+ if (outHandlers == null) {
+ outHandlerLst = new HashSet<String>();
+ } else {
+ outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
+ }
+
+ outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
+
+ secProperties.setOutHandlerClassNames(outHandlerLst.toArray(new String[outHandlerLst.size()]));
+
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+
+ //FIXME: Get user details
+ private String getUserName(JobExecutionContext context) {
+// if (context.getConfigurationData()!= null) {
+// return context.getConfigurationData().getBasicMetadata().getUserName();
+// } else {
+ return "";
+// }
+ }
+
+ protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+ throws UnknownActivityIdentifierFault {
+
+ GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory.newInstance();
+
+ stats.addNewGetActivityStatuses().setActivityIdentifierArray(new EndpointReferenceType[] { activityEpr });
+
+ GetActivityStatusesResponseDocument resDoc = fc.getActivityStatuses(stats);
+
+ ActivityStatusType activityStatus = resDoc.getGetActivityStatusesResponse().getResponseArray()[0]
+ .getActivityStatus();
+ return activityStatus;
+ }
+
+ protected String formatStatusMessage(String activityUrl, String status) {
+ return String.format("Activity %s is %s.\n", activityUrl, status);
+ }
+
+ protected String subStatusAsString(ActivityStatusType statusType) {
+
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(statusType.getState().toString());
+
+ XmlCursor acursor = statusType.newCursor();
+ if (acursor.toFirstChild()) {
+ do {
+ if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+ sb.append(":");
+ sb.append(acursor.getName().getLocalPart());
+ }
+ } while (acursor.toNextSibling());
+ acursor.dispose();
+ return sb.toString();
+ } else {
+ acursor.dispose();
+ return sb.toString();
+ }
+
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+
+ protected KeyAndCertCredential generateShortLivedCertificate(String userDN, String caCertPath, String caKeyPath,
+ String caPwd) throws Exception {
+ final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+ // ago
+
+ final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+ final long endTime = startTime + 30 * 3600 * 1000;
+
+ String keyLengthProp = "1024";
+ int keyLength = Integer.parseInt(keyLengthProp);
+ String signatureAlgorithm = "SHA1withRSA";
+
+ KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
+
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+ kpg.initialize(keyLength);
+ KeyPair pair = kpg.generateKeyPair();
+
+ X500Principal subjectDN = new X500Principal(userDN);
+ Random rand = new Random();
+
+ SubjectPublicKeyInfo publicKeyInfo;
+ try {
+ publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
+ .readObject());
+ } catch (IOException e) {
+ throw new InvalidKeyException("Can not parse the public key"
+ + "being included in the short lived certificate", e);
+ }
+
+ X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+ X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+ X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+ new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
+
+ AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+ X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+ certificate.checkValidity(new Date());
+ certificate.verify(caCred.getCertificate().getPublicKey());
+ KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+ caCred.getCertificate() });
+
+ return result;
+ }
+
+ private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
+ InputStream isKey = new FileInputStream(caKeyPath);
+ PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
+
+ InputStream isCert = new FileInputStream(caCertPath);
+ X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+
+ if (isKey != null)
+ isKey.close();
+ if (isCert != null)
+ isCert.close();
+
+ return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+ }
+
+ private String getCNFromUserDN(String userDN) {
+ return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/GSISecurityContext.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/GSISecurityContext.java
new file mode 100644
index 0000000..22d2e13
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/security/GSISecurityContext.java
@@ -0,0 +1,288 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.bes.security;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.AbstractSecurityContext;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+
+/**
+ * Handles GRID related security.
+ */
+public class GSISecurityContext extends AbstractSecurityContext {
+
+ protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class);
+ /*
+ * context name
+ */
+ public static final String GSI_SECURITY_CONTEXT = "gsi";
+
+ public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+
+ private GSSCredential gssCredentials = null;
+
+ private Cluster pbsCluster = null;
+
+ // Set trusted cert path and add provider
+ static {
+ Security.addProvider(new GlobusProvider());
+ try {
+ setUpTrustedCertificatePath();
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
+
+ File file = new File(trustedCertificatePath);
+
+ if (!file.exists() || !file.canRead()) {
+ File f = new File(".");
+ log.info("Current directory " + f.getAbsolutePath());
+ throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
+ } else {
+ System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+ }
+ }
+
+ private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
+
+ String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+ setUpTrustedCertificatePath(trustedCertificatePath);
+ }
+
+ /**
+ * Gets the trusted certificate path. Trusted certificate path is stored in "X509_CERT_DIR"
+ * system property.
+ * @return The trusted certificate path as a string.
+ */
+ public static String getTrustedCertificatePath() {
+ return System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY);
+ }
+
+
+ public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) {
+ super(credentialReader, requestData);
+ }
+
+
+ public GSISecurityContext(Cluster pbsCluster) {
+ this.setPbsCluster(pbsCluster);
+ }
+
+ /**
+ * Gets GSSCredentials. The process is as follows;
+ * If credentials were queried for the first time create credentials.
+ * 1. Try creating credentials using certificates stored in the credential store
+ * 2. If 1 fails use user name and password to create credentials
+ * If credentials are already created check the remaining life time of the credential. If
+ * remaining life time is less than CREDENTIAL_RENEWING_THRESH_HOLD, then renew credentials.
+ * @return GSSCredentials to be used.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while creating credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential getGssCredentials() throws GFacException, ApplicationSettingsException {
+
+ if (gssCredentials == null) {
+
+ try {
+ gssCredentials = getCredentialsFromStore();
+ } catch (Exception e) {
+ log.error("An exception occurred while retrieving credentials from the credential store. " +
+ "Will continue with my proxy user name and password.", e);
+ }
+
+ // If store does not have credentials try to get from user name and password
+ if (gssCredentials == null) {
+ gssCredentials = getDefaultCredentials();
+ }
+
+ // if still null, throw an exception
+ if (gssCredentials == null) {
+ throw new GFacException("Unable to retrieve my proxy credentials to continue operation.");
+ }
+ } else {
+ try {
+ if (gssCredentials.getRemainingLifetime() < CREDENTIAL_RENEWING_THRESH_HOLD) {
+ return renewCredentials();
+ }
+ } catch (GSSException e) {
+ throw new GFacException("Unable to retrieve remaining life time from credentials.", e);
+ }
+ }
+
+ return gssCredentials;
+ }
+
+ /**
+ * Renews credentials. First try to renew credentials as a trusted renewer. If that failed
+ * use user name and password to renew credentials.
+ * @return Renewed credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential renewCredentials() throws GFacException, ApplicationSettingsException {
+
+ // First try to renew credentials as a trusted renewer
+ try {
+ gssCredentials = renewCredentialsAsATrustedHost();
+ } catch (Exception e) {
+ log.warn("Renewing credentials as a trusted renewer failed", e);
+ gssCredentials = getProxyCredentials();
+ }
+
+ return gssCredentials;
+ }
+
+ /**
+ * Reads the credentials from credential store.
+ * @return If token is found in the credential store, will return a valid credential. Else returns null.
+ * @throws Exception If an error occurred while retrieving credentials.
+ */
+ public GSSCredential getCredentialsFromStore() throws Exception {
+
+ if (getCredentialReader() == null) {
+ return null;
+ }
+
+ Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
+ getRequestData().getTokenId());
+
+ if (credential != null) {
+ if (credential instanceof CertificateCredential) {
+
+ log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
+ " gateway id - " + getRequestData().getGatewayId());
+
+ CertificateCredential certificateCredential = (CertificateCredential) credential;
+
+ X509Certificate[] certificates = certificateCredential.getCertificates();
+ X509Credential newCredential = new X509Credential(certificateCredential.getPrivateKey(), certificates);
+
+ GlobusGSSCredentialImpl cred = new GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT);
+ System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE));
+ return cred;
+ //return new GlobusGSSCredentialImpl(newCredential,
+ // GSSCredential.INITIATE_AND_ACCEPT);
+ } else {
+ log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
+ "Credential type - " + credential.getClass().getName());
+ }
+ } else {
+ log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
+ + "gateway id - " + getRequestData().getGatewayId());
+ }
+
+ return null;
+ }
+
+ /**
+ * Gets the default proxy certificate.
+ * @return Default my proxy credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while retrieving credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException{
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ try {
+ return myproxy.get(getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+ getRequestData().getMyProxyLifeTime());
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while retrieving default security credentials.", e);
+ }
+ }
+
+ /**
+ * Gets a new proxy certificate given current credentials.
+ * @return The short lived GSSCredentials
+ * @throws org.apache.airavata.gfac.GFacException If an error is occurred while retrieving credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential getProxyCredentials() throws GFacException, ApplicationSettingsException {
+
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ try {
+ return myproxy.get(gssCredentials, getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+ getRequestData().getMyProxyLifeTime());
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while renewing security credentials using user/password.", e);
+ }
+ }
+
+ /**
+ * Renew GSSCredentials.
+ * Before executing we need to add current host as a trusted renewer. Note to renew credentials
+ * we dont need user name and password.
+ * To do that execute following command
+ * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER NAME>
+ * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3
+ * Enter MyProxy pass phrase:
+ * A credential has been received for user us3 in /tmp/x509up_u501.
+ * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l ogce -s myproxy.teragrid.org
+ * @return Renewed credentials.
+ * @throws org.apache.airavata.gfac.GFacException If an error occurred while renewing credentials.
+ * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+ */
+ public GSSCredential renewCredentialsAsATrustedHost() throws GFacException, ApplicationSettingsException {
+ MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+ GetParams getParams = new GetParams();
+ getParams.setAuthzCreds(gssCredentials);
+ getParams.setUserName(getRequestData().getMyProxyUserName());
+ getParams.setLifetime(getRequestData().getMyProxyLifeTime());
+ try {
+ return myproxy.get(gssCredentials, getParams);
+ } catch (MyProxyException e) {
+ throw new GFacException("An error occurred while renewing security credentials.", e);
+ }
+ }
+
+ public Cluster getPbsCluster() {
+ return pbsCluster;
+ }
+
+ public void setPbsCluster(Cluster pbsCluster) {
+ this.pbsCluster = pbsCluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
new file mode 100644
index 0000000..77f8a3a
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/ApplicationProcessor.java
@@ -0,0 +1,252 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.bes.utils;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.schemas.gfac.ExtendedKeyValueType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.JobTypeType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.EnvironmentType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType;
+
+import java.io.File;
+
+
+public class ApplicationProcessor {
+
+ public static void generateJobSpecificAppElements(JobDefinitionType value, JobExecutionContext context){
+
+ String userName = getUserNameFromContext(context);
+ if (userName.equalsIgnoreCase("admin")){
+ userName = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+ }
+
+ HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) context
+ .getApplicationContext().getApplicationDeploymentDescription()
+ .getType();
+
+ createGenericApplication(value, appDepType);
+
+ if (appDepType.getApplicationEnvironmentArray().length > 0) {
+ createApplicationEnvironment(value,
+ appDepType.getApplicationEnvironmentArray(), appDepType);
+ }
+
+
+ if (appDepType.getExecutableLocation() != null) {
+ FileNameType fNameType = FileNameType.Factory.newInstance();
+ fNameType.setStringValue(appDepType.getExecutableLocation());
+ if(isParallelJob(appDepType)) {
+ JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType);
+ JSDLUtils.getSPMDApplication(value).setSPMDVariation(getSPMDVariation(appDepType));
+
+ if(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES)!=null){
+ NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance();
+ num.setStringValue(getValueFromMap(appDepType, JSDLUtils.NUMBEROFPROCESSES));
+ JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num);
+ }
+
+ if(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST)!=null){
+ ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance();
+ pph.setStringValue(getValueFromMap(appDepType, JSDLUtils.PROCESSESPERHOST));
+ JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
+ }
+
+ if(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST)!=null){
+ ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance();
+ tpp.setStringValue(getValueFromMap(appDepType, JSDLUtils.THREADSPERHOST));
+ JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp);
+
+ }
+
+ if(userName != null) {
+ UserNameType userNameType = UserNameType.Factory.newInstance();
+ userNameType.setStringValue(userName);
+ JSDLUtils.getSPMDApplication(value).setUserName(userNameType);
+ }
+ }
+ else {
+ JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType);
+ if(userName != null) {
+ UserNameType userNameType = UserNameType.Factory.newInstance();
+ userNameType.setStringValue(userName);
+ JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType);
+ }
+ }
+ }
+
+
+ String stdout = (appDepType.getStandardOutput() != null) ? new File(appDepType.getStandardOutput()).getName(): "stdout";
+ ApplicationProcessor.setApplicationStdOut(value, appDepType, stdout);
+
+
+ String stderr = (appDepType.getStandardError() != null) ? new File(appDepType.getStandardError()).getName() : "stderr";
+ ApplicationProcessor.setApplicationStdErr(value, appDepType, stderr);
+
+ }
+
+ public static String getUserNameFromContext(JobExecutionContext jobContext) {
+ if(jobContext.getTaskData() == null)
+ return null;
+ //FIXME: Discuss to get user and change this
+ return "admin";
+ }
+ public static boolean isParallelJob(HpcApplicationDeploymentType appDepType) {
+
+ boolean isParallel = false;
+
+ if (appDepType.getJobType() != null) {
+ // TODO set data output directory
+ int status = appDepType.getJobType().intValue();
+
+ switch (status) {
+ // TODO: this check should be done outside this class
+ case JobTypeType.INT_MPI:
+ case JobTypeType.INT_OPEN_MP:
+ isParallel = true;
+ break;
+
+ case JobTypeType.INT_SERIAL:
+ case JobTypeType.INT_SINGLE:
+ isParallel = false;
+ break;
+
+ default:
+ isParallel = false;
+ break;
+ }
+ }
+ return isParallel;
+ }
+
+
+ public static void createApplicationEnvironment(JobDefinitionType value, NameValuePairType[] nameValuePairs, HpcApplicationDeploymentType appDepType) {
+
+ if(isParallelJob(appDepType)) {
+ for (NameValuePairType nv : nameValuePairs) {
+ EnvironmentType envType = JSDLUtils.getOrCreateSPMDApplication(value).addNewEnvironment();
+ envType.setName(nv.getName());
+ envType.setStringValue(nv.getValue());
+ }
+ }
+ else {
+ for (NameValuePairType nv : nameValuePairs) {
+ EnvironmentType envType = JSDLUtils.getOrCreatePOSIXApplication(value).addNewEnvironment();
+ envType.setName(nv.getName());
+ envType.setStringValue(nv.getValue());
+ }
+ }
+
+ }
+
+
+ public static String getSPMDVariation (HpcApplicationDeploymentType appDepType) {
+
+ String variation = null;
+
+ if (appDepType.getJobType() != null) {
+ // TODO set data output directory
+ int status = appDepType.getJobType().intValue();
+
+ switch (status) {
+ // TODO: this check should be done outside this class
+ case JobTypeType.INT_MPI:
+ variation = SPMDVariations.MPI.value();
+ break;
+
+ case JobTypeType.INT_OPEN_MP:
+ variation = SPMDVariations.OpenMPI.value();
+ break;
+
+ }
+ }
+ return variation;
+ }
+
+
+ public static void addApplicationArgument(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stringPrm) {
+ if(isParallelJob(appDepType))
+ JSDLUtils.getOrCreateSPMDApplication(value)
+ .addNewArgument().setStringValue(stringPrm);
+ else
+ JSDLUtils.getOrCreatePOSIXApplication(value)
+ .addNewArgument().setStringValue(stringPrm);
+
+ }
+
+ public static void setApplicationStdErr(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) {
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ if (isParallelJob(appDepType))
+ JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
+ else
+ JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
+ }
+
+ public static void setApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType, String stderr) {
+ FileNameType fName = FileNameType.Factory.newInstance();
+ fName.setStringValue(stderr);
+ if (isParallelJob(appDepType))
+ JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
+ else
+ JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
+ }
+
+ public static String getApplicationStdOut(JobDefinitionType value, HpcApplicationDeploymentType appDepType) throws RuntimeException {
+ if (isParallelJob(appDepType)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue();
+ else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue();
+ }
+
+ public static String getApplicationStdErr(JobDefinitionType value, HpcApplicationDeploymentType appDepType) throws RuntimeException {
+ if (isParallelJob(appDepType)) return JSDLUtils.getOrCreateSPMDApplication(value).getError().getStringValue();
+ else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue();
+ }
+
+ public static void createGenericApplication(JobDefinitionType value, HpcApplicationDeploymentType appDepType) {
+ if (appDepType.getApplicationName() != null) {
+ ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+ String appName = appDepType.getApplicationName()
+ .getStringValue();
+ appType.setApplicationName(appName);
+ JSDLUtils.getOrCreateJobIdentification(value).setJobName(appName);
+ }
+ }
+
+
+ public static String getValueFromMap(HpcApplicationDeploymentType appDepType, String name) {
+ ExtendedKeyValueType[] extended = appDepType.getKeyValuePairsArray();
+ for(ExtendedKeyValueType e: extended) {
+ if(e.getName().equalsIgnoreCase(name)) {
+ return e.getStringValue();
+ }
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataStagingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataStagingProcessor.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataStagingProcessor.java
new file mode 100644
index 0000000..1ff5504
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataStagingProcessor.java
@@ -0,0 +1,236 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.bes.utils;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+
+public class DataStagingProcessor {
+
+ public static void generateDataStagingElements(JobDefinitionType value, JobExecutionContext context) throws Exception{
+
+ HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) context
+ .getApplicationContext().getApplicationDeploymentDescription()
+ .getType();
+
+
+ String gridftpEndpoint = ((UnicoreHostType) context.getApplicationContext().getHostDescription().getType())
+ .getGridFTPEndPointArray()[0];
+
+
+ if (context.getInMessageContext().getParameters().size() > 0) {
+ buildDataStagingFromInputContext(context, value, gridftpEndpoint, appDepType);
+ }
+
+ if (context.getOutMessageContext().getParameters().size() > 0) {
+ buildFromOutputContext(context, value, gridftpEndpoint, appDepType);
+ }
+
+ createStdOutURIs(value, appDepType, gridftpEndpoint, isUnicoreEndpoint(context));
+
+ }
+
+ private static void createInURIElement(JobDefinitionType value,
+ String endpoint, String inputDir, ActualParameter inParam)
+ throws Exception {
+
+ String uri = ((URIParameterType) inParam.getType()).getValue();
+ String fileName = new File(uri).getName();
+ if (uri.startsWith("file")) {
+ URI gridFTPInputDirURI = URIUtils.createGsiftpURI(endpoint,
+ inputDir);
+ String filePath = gridFTPInputDirURI.toString() + File.separator
+ + fileName;
+ JSDLUtils
+ .addDataStagingSourceElement(value, filePath, null, fileName);
+ } else if (uri.startsWith("gsiftp") || uri.startsWith("http")
+ || uri.startsWith("rns")) {
+ // no need to stage-in those files to the input
+ // directory
+ JSDLUtils.addDataStagingSourceElement(value, uri, null, fileName);
+ }
+
+ }
+
+ private static void createStdOutURIs(JobDefinitionType value,
+ HpcApplicationDeploymentType appDepType, String endpoint,
+ boolean isUnicore) throws Exception {
+
+ URI remoteOutputDir = URIUtils.createGsiftpURI(endpoint,
+ appDepType.getOutputDataDirectory());
+
+ String stdout = ApplicationProcessor.getApplicationStdOut(value, appDepType);
+
+ String stderr = ApplicationProcessor.getApplicationStdErr(value, appDepType);
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stdoutURI = GFacUtils.createGsiftpURIAsString(
+ remoteOutputDir.toString(), stdoutFileName);
+ JSDLUtils.addDataStagingTargetElement(value, null, stdoutFileName,
+ stdoutURI);
+
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+ String stderrURI = GFacUtils.createGsiftpURIAsString(
+ remoteOutputDir.toString(), stderrFileName);
+ JSDLUtils.addDataStagingTargetElement(value, null, stderrFileName,
+ stderrURI);
+
+ if(isUnicore) {
+ String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+ String scriptExitCode = GFacUtils.createGsiftpURIAsString(
+ remoteOutputDir.toString(), scriptExitCodeFName);
+ JSDLUtils.addDataStagingTargetElement(value, null,
+ scriptExitCodeFName, scriptExitCode.toString());
+ }
+
+ }
+
+
+ private static void createOutStringElements(JobDefinitionType value,
+ HpcApplicationDeploymentType appDeptype, String endpoint, String prmValue) throws Exception {
+
+ if(prmValue == null || "".equals(prmValue)) return;
+
+
+ String outputUri = GFacUtils.createGsiftpURIAsString(endpoint, appDeptype.getOutputDataDirectory());
+
+ URI finalOutputUri = URIUtils.createGsiftpURI(outputUri, prmValue);
+ JSDLUtils.addDataStagingTargetElement(value, null, prmValue, finalOutputUri.toString());
+ }
+
+
+ private static void createOutURIElement(JobDefinitionType value,
+ String prmValue) throws Exception {
+ String fileName = new File(prmValue.toString()).getName();
+ JSDLUtils.addDataStagingTargetElement(value, null, fileName, prmValue);
+ }
+
+
+ private static JobDefinitionType buildFromOutputContext(JobExecutionContext context,
+ JobDefinitionType value, String gridftpEndpoint,
+ HpcApplicationDeploymentType appDepType) throws Exception {
+
+ Map<String, Object> outputParams = context.getOutMessageContext()
+ .getParameters();
+
+ for (String paramKey : outputParams.keySet()) {
+
+ ActualParameter outParam = (ActualParameter) outputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = outParam.getType().getType().toString();
+
+ if ("URI".equals(paramDataType)) {
+ String uriPrm = ((URIParameterType) outParam.getType())
+ .getValue();
+ createOutURIElement(value, uriPrm);
+ }
+
+ // string params are converted into the job arguments
+
+ else if (("URIArray").equals(paramDataType)) {
+ String[] uriArray = ((URIArrayType) outParam.getType())
+ .getValueArray();
+ for (String u : uriArray) {
+
+ createOutURIElement(value, u);
+ }
+
+ }
+ else if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) outParam
+ .getType()).getValue();
+ createOutStringElements(value, appDepType, gridftpEndpoint, stringPrm);
+ }
+
+ else if ("StringArray".equals(paramDataType)) {
+ String[] valueArray = ((StringArrayType) outParam.getType())
+ .getValueArray();
+ for (String v : valueArray) {
+ createOutStringElements(value, appDepType, gridftpEndpoint, v);
+ }
+ }
+ }
+
+ return value;
+ }
+
+
+ private static void buildDataStagingFromInputContext(JobExecutionContext context, JobDefinitionType value, String gridftpEndpoint, HpcApplicationDeploymentType appDepType)
+ throws Exception {
+
+ // TODO set data directory
+ Map<String, Object> inputParams = context.getInMessageContext()
+ .getParameters();
+
+ for (String paramKey : inputParams.keySet()) {
+
+ ActualParameter inParam = (ActualParameter) inputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = inParam.getType().getType().toString();
+
+ if ("URI".equals(paramDataType)) {
+ createInURIElement(value, gridftpEndpoint,
+ appDepType.getInputDataDirectory(), inParam);
+ }
+
+ // string params are converted into the job arguments
+
+ else if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) inParam.getType())
+ .getValue();
+ ApplicationProcessor.addApplicationArgument(value, appDepType, stringPrm);
+ }
+ }
+
+ }
+
+
+ public static boolean isUnicoreEndpoint(JobExecutionContext context) {
+ return ( (context.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType)?true:false );
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
new file mode 100644
index 0000000..a1a884b
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.bes.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.StringParameterType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import de.fzj.unicore.uas.client.StorageClient;
+
+
+public class DataTransferrer {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private JobExecutionContext jobContext;
+
+ private StorageClient storageClient;
+
+ public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) {
+ this.jobContext = jobContext;
+ this.storageClient = storageClient;
+ }
+
+
+ public void uploadLocalFiles() throws GFacProviderException {
+ Map<String, Object> inputParams = jobContext.getInMessageContext()
+ .getParameters();
+ for (String paramKey : inputParams.keySet()) {
+ ActualParameter inParam = (ActualParameter) inputParams
+ .get(paramKey);
+ String paramDataType = inParam.getType().getType().toString();
+ if("URI".equals(paramDataType)) {
+ String uri = ((URIParameterType) inParam.getType()).getValue();
+ String fileName = new File(uri).getName();
+ if (uri.startsWith("file")) {
+ try {
+ String uriWithoutProtocol = uri.substring(
+ uri.lastIndexOf("://") + 1, uri.length());
+ FileUploader fileUploader = new FileUploader(
+ uriWithoutProtocol, "input/" + fileName,
+ Mode.overwrite);
+ fileUploader.perform(storageClient);
+ } catch (FileNotFoundException e3) {
+ throw new GFacProviderException(
+ "Error while staging-in, local file "+fileName+" not found", e3);
+ } catch (Exception e) {
+ throw new GFacProviderException("Cannot upload files", e);
+
+ }
+
+ }
+ }
+ }
+
+ }
+
+ /**
+ * This method will download all the remote files specified according to the output
+ * context of a job.
+ * */
+ public void downloadRemoteFiles() throws GFacProviderException {
+
+ String downloadLocation = getDownloadLocation();
+
+ File file = new File(downloadLocation);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+
+ Map<String, Object> outputParams = jobContext.getOutMessageContext()
+ .getParameters();
+
+ for (String paramKey : outputParams.keySet()) {
+
+ ActualParameter outParam = (ActualParameter) outputParams
+ .get(paramKey);
+
+ // if single urls then convert each url into jsdl source
+ // elements,
+ // that are formed by concat of gridftpurl+inputdir+filename
+
+ String paramDataType = outParam.getType().getType().toString();
+
+ if ("String".equals(paramDataType)) {
+ String stringPrm = ((StringParameterType) outParam
+ .getType()).getValue();
+ String localFileName = null;
+ //TODO: why analysis.tar? it wont scale to gateways..
+ if(stringPrm == null || stringPrm.isEmpty()){
+ localFileName = "analysis-results.tar";
+ }else{
+ localFileName = stringPrm.substring(stringPrm.lastIndexOf("/")+1);
+ }
+ String outputLocation = downloadLocation+File.separator+localFileName;
+ FileDownloader fileDownloader = new FileDownloader("output/"+stringPrm,outputLocation, Mode.overwrite);
+ try {
+ fileDownloader.perform(storageClient);
+ ((StringParameterType) outParam.getType()).setValue(outputLocation);
+ stringMap.put(paramKey, outParam);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+
+ else if ("StringArray".equals(paramDataType)) {
+ String[] valueArray = ((StringArrayType) outParam.getType())
+ .getValueArray();
+ for (String v : valueArray) {
+ String localFileName = v.substring(v.lastIndexOf("/")+1);;
+ String outputLocation = downloadLocation+File.separator+localFileName;
+ FileDownloader fileDownloader = new FileDownloader("output/"+v,outputLocation, Mode.overwrite);
+ try {
+ fileDownloader.perform(storageClient);
+ ((StringParameterType) outParam.getType()).setValue(outputLocation);
+ stringMap.put(paramKey, outParam);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+ }
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacProviderException("Empty Output returned from the Application, Double check the application" +
+ "and ApplicationDescriptor output Parameter Names");
+ }
+
+ downloadStdOuts();
+ }
+
+
+ public void downloadStdOuts() throws GFacProviderException{
+ String downloadLocation = getDownloadLocation();
+ File file = new File(downloadLocation);
+ if(!file.exists()){
+ file.mkdirs();
+ }
+
+ HpcApplicationDeploymentType appDepType = (HpcApplicationDeploymentType) jobContext
+ .getApplicationContext().getApplicationDeploymentDescription()
+ .getType();
+
+ String stdout = appDepType.getStandardOutput();
+ String stderr = appDepType.getStandardError();
+ if(stdout != null) {
+ stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+ }
+
+ if(stderr != null) {
+ stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+ }
+
+ String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout"
+ : stdout;
+ String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr"
+ : stderr;
+
+ ApplicationDescription application = jobContext.getApplicationContext().getApplicationDeploymentDescription();
+ ApplicationDeploymentDescriptionType appDesc = application.getType();
+
+ String stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+ FileDownloader f1 = new FileDownloader("output/"+stdoutFileName,stdoutLocation, Mode.overwrite);
+ try {
+ f1.perform(storageClient);
+ String stdoutput = readFile(stdoutLocation);
+ appDesc.setStandardOutput(stdoutput);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ String stderrLocation = downloadLocation+File.separator+stderrFileName;
+ FileDownloader f2 = new FileDownloader("output/"+stderrFileName,stderrLocation, Mode.overwrite);
+ try {
+ f2.perform(storageClient);
+ String stderror = readFile(stderrLocation);
+ appDesc.setStandardError(stderror);
+ } catch (Exception e) {
+ throw new GFacProviderException(e.getLocalizedMessage(),e);
+ }
+ }
+
+ private String readFile(String localFile) throws IOException {
+ BufferedReader instream = new BufferedReader(new FileReader(localFile));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(Constants.NEWLINE);
+ }
+
+ log.info("finish read file:" + localFile);
+
+ return buff.toString();
+ }
+
+ private String getDownloadLocation() {
+ TaskDetails taskData = jobContext.getTaskData();
+ if (taskData != null && taskData.getAdvancedOutputDataHandling() != null) {
+ String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+ return outputDataDirectory;
+ }
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileDownloader.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileDownloader.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileDownloader.java
new file mode 100644
index 0000000..680aa51
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileDownloader.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.bes.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead;
+
+/**
+ * helper that exports remote files from a UNICORE Storage
+ * to the local client machine.<br/>
+ * Simple wildcards ("*" and "?") and download of
+ * directories are supported.
+ *
+ * TODO this should be refactored so the single-file download logic
+ * is separated from the wildcard/directory/provided outputStream logic
+ *
+ * @author schuller
+ */
+public class FileDownloader extends FileTransferBase{
+
+ private boolean showProgress=true;
+
+ private boolean forceFileOnly=false;
+
+ private OutputStream targetStream=null;
+
+ public FileDownloader(String from, String to, Mode mode){
+ this(from,to,mode,true);
+ }
+
+ public FileDownloader(String from, String to, Mode mode, boolean failOnError){
+ this.to=to;
+ this.from=from;
+ this.mode=mode;
+ this.failOnError=failOnError;
+ }
+
+ public void perform(StorageClient sms)throws Exception{
+ boolean isWildcard=hasWildCards(from);
+ boolean isDirectory=false;
+ GridFileType gridSource=null;
+ if(isWildcard){
+ performWildCardExport(sms);
+ }
+ else {
+ //check if source is a directory
+ gridSource=sms.listProperties(from);
+ isDirectory=gridSource.getIsDirectory();
+ if(isDirectory){
+ if(forceFileOnly){
+ throw new IOException("Source is a directory");
+ }
+ performDirectoryExport(gridSource, new File(to), sms);
+ }
+ else{
+ download(gridSource,new File(to),sms);
+ }
+ }
+ }
+
+ protected void performDirectoryExport(GridFileType directory, File targetDirectory, StorageClient sms)throws Exception{
+ if(!targetDirectory.exists()|| !targetDirectory.canWrite()){
+ throw new IOException("Target directory <"+to+"> does not exist or is not writable!");
+ }
+ if(!targetDirectory.isDirectory()){
+ throw new IOException("Target <"+to+"> is not a directory!");
+ }
+ GridFileType[]gridFiles=sms.listDirectory(directory.getPath());
+ for(GridFileType file: gridFiles){
+ if(file.getIsDirectory()){
+ if(!recurse) {
+ System.out.println("Skipping directory "+file.getPath());
+ continue;
+ }
+ else{
+ File newTargetDirectory=new File(targetDirectory,getName(file.getPath()));
+ boolean success=newTargetDirectory.mkdirs();
+ if(!success)throw new IOException("Can create directory: "+newTargetDirectory.getAbsolutePath());
+ performDirectoryExport(file, newTargetDirectory, sms);
+ continue;
+ }
+ }
+ download(file, new File(targetDirectory,getName(file.getPath())), sms);
+ }
+ }
+
+ protected void performWildCardExport(StorageClient sms)throws Exception{
+ String dir=getDir(from);
+ if(dir==null)dir="/";
+ GridFileType[] files=sms.find(dir, false, from, false, null, null);
+ File targetDir=targetStream==null?new File(to):null;
+ if(targetStream==null){
+ if(!targetDir.isDirectory())throw new IOException("Target is not a directory.");
+ }
+ for(GridFileType f: files){
+ download(f, targetDir, sms);
+ }
+ }
+
+ private String getDir(String path){
+ return new File(path).getParent();
+ }
+
+ private String getName(String path){
+ return new File(path).getName();
+ }
+
+ /**
+ * download a single regular file
+ *
+ * @param source - grid file descriptor
+ * @param localFile - local file or directory to write to
+ * @param sms
+ * @throws Exception
+ */
+ private void download(GridFileType source, File localFile, StorageClient sms)throws Exception{
+ if(source==null || source.getIsDirectory()){
+ throw new IllegalStateException("Source="+source);
+ }
+
+ OutputStream os=targetStream!=null?targetStream:null;
+ FileTransferClient ftc=null;
+ try{
+ String path=source.getPath();
+ if(targetStream==null){
+ if(localFile.isDirectory()){
+ localFile=new File(localFile,getName(source.getPath()));
+ }
+ if(mode.equals(Mode.nooverwrite) && localFile.exists()){
+ System.out.println("File exists and creation mode was set to 'nooverwrite'.");
+ return;
+ }
+ System.out.println("Downloading remote file '"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath());
+ os=new FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append));
+ }
+
+ chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()]));
+ Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+ ftc=sms.getExport(path,extraParameters,chosenProtocol);
+ configure(ftc, extraParameters);
+ System.out.println("DEB:File transfer URL : "+ftc.getUrl());
+// ProgressBar p=null;
+ if(ftc instanceof IMonitorable && showProgress){
+ long size=ftc.getSourceFileSize();
+ if(isRange()){
+ size=getRangeSize();
+ }
+// p=new ProgressBar(localFile.getName(),size,msg);
+// ((IMonitorable) ftc).setProgressListener(p);
+ }
+ long startTime=System.currentTimeMillis();
+ if(isRange()){
+ if(!(ftc instanceof SupportsPartialRead)){
+ throw new Exception("Byte range is defined but protocol does not allow " +
+ "partial read! Please choose a different protocol!");
+ }
+ System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:""));
+ SupportsPartialRead pReader=(SupportsPartialRead)ftc;
+ pReader.readPartial(startByte, endByte-startByte+1, os);
+ }
+ else{
+ ftc.readAllData(os);
+ }
+// if(p!=null){
+// p.finish();
+// }
+ if(timing){
+ long duration=System.currentTimeMillis()-startTime;
+ double rate=(double)localFile.length()/(double)duration;
+ System.out.println("Rate: " +rate+ " kB/sec.");
+ }
+ if(targetStream==null)copyProperties(source, localFile);
+ }
+ finally{
+ try{
+ if(targetStream==null && os!=null){
+ os.close();
+ }
+ }catch(Exception ignored){}
+ if(ftc!=null){
+ try{
+ ftc.destroy();
+ }catch(Exception e1){
+// System.out.println("Could not destroy the filetransfer client",e1);
+ }
+ }
+ }
+ }
+
+ /**
+ * if possible, copy the remote executable flag to the local file
+ * @throws Exception
+ */
+ private void copyProperties(GridFileType source, File localFile)throws Exception{
+ try{
+ localFile.setExecutable(source.getPermissions().getExecutable());
+ }
+ catch(Exception ex){
+ //TODO: logging
+// ("Can't set 'executable' flag for "+localFile.getName(), ex);
+ }
+ }
+
+ private void configure(FileTransferClient ftc, Map<String,String>params){
+ if(ftc instanceof UFTPFileTransferClient){
+ UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+ String secret=params.get(UFTPConstants.PARAM_SECRET);
+ u.setSecret(secret);
+ }
+ }
+
+ public void setShowProgress(boolean showProgress) {
+ this.showProgress = showProgress;
+ }
+
+ public void setForceFileOnly(boolean forceFileOnly) {
+ this.forceFileOnly = forceFileOnly;
+ }
+
+ public void setTargetStream(OutputStream targetStream) {
+ this.targetStream = targetStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/49b6987f/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileTransferBase.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileTransferBase.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileTransferBase.java
new file mode 100644
index 0000000..ef46feb
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/FileTransferBase.java
@@ -0,0 +1,227 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.bes.utils;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.util.PropertyHelper;
+public class FileTransferBase {
+
+ protected Properties extraParameterSource;
+
+ protected boolean timing=false;
+
+ protected boolean recurse=false;
+
+ protected String from;
+
+ protected String to;
+
+ //index of first byte to download
+ protected Long startByte;
+
+ //index of last byte to download
+ protected Long endByte;
+
+ /**
+ * the creation mode
+ */
+ protected Mode mode;
+
+ /**
+ * whether the job processing should fail if an error occurs
+ */
+ protected boolean failOnError;
+
+ protected List<ProtocolType.Enum> preferredProtocols=new ArrayList<ProtocolType.Enum>();
+
+ public FileTransferBase(){
+ preferredProtocols.add(ProtocolType.BFT);
+ }
+
+ protected Map<String,String>makeExtraParameters(ProtocolType.Enum protocol){
+ Map<String, String> res;
+ if(extraParameterSource==null){
+ res=new HashMap<String, String>();
+ }
+ else{
+ String p=String.valueOf(protocol);
+ PropertyHelper ph=new PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()});
+ res= ph.getFilteredMap();
+ }
+ if(res.size()>0){
+ // TODO: change it to logger
+ System.out.println("Have "+res.size()+" extra parameters for protocol "+protocol);
+ }
+ return res;
+ }
+
+
+ public String getTo() {
+ return to;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public boolean isFailOnError() {
+ return failOnError;
+ }
+
+ public boolean isTiming() {
+ return timing;
+ }
+
+ public void setTiming(boolean timing) {
+ this.timing = timing;
+ }
+
+ public void setFailOnError(boolean failOnError) {
+ this.failOnError = failOnError;
+ }
+
+ public List<ProtocolType.Enum> getPreferredProtocols() {
+ return preferredProtocols;
+ }
+
+ public void setPreferredProtocols(List<ProtocolType.Enum> preferredProtocols) {
+ this.preferredProtocols = preferredProtocols;
+ }
+
+ public void setExtraParameterSource(Properties properties){
+ this.extraParameterSource=properties;
+ }
+
+ public void setRecurse(boolean recurse) {
+ this.recurse = recurse;
+ }
+ /**
+ * check if the given path denotes a valid remote directory
+ * @param remotePath - the path
+ * @param sms - the storage
+ * @return <code>true</code> if the remote directory exists and is a directory
+ */
+ protected boolean isValidDirectory(String remotePath, StorageClient sms){
+ boolean result=false;
+ if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){
+ try{
+ GridFileType gft=sms.listProperties(remotePath);
+ result=gft.getIsDirectory();
+ }catch(Exception ex){
+ result=false;
+ }
+ }
+ else result=true;
+
+ return result;
+ }
+
+ public File[] resolveWildCards(File original){
+ final String name=original.getName();
+ if(!hasWildCards(original))return new File[]{original};
+ File parent=original.getParentFile();
+ if(parent==null)parent=new File(".");
+ FilenameFilter filter=new FilenameFilter(){
+ Pattern p=createPattern(name);
+ public boolean accept(File file, String name){
+ return p.matcher(name).matches();
+ }
+ };
+ return parent.listFiles(filter);
+ }
+
+ protected boolean hasWildCards(File file){
+ return hasWildCards(file.getName());
+ }
+
+ public boolean hasWildCards(String name){
+ return name.contains("*") || name.contains("?");
+ }
+
+ private Pattern createPattern(String nameWithWildcards){
+ String regex=nameWithWildcards.replace("?",".").replace("*", ".*");
+ return Pattern.compile(regex);
+ }
+
+ protected ProtocolType.Enum chosenProtocol=null;
+
+ public ProtocolType.Enum getChosenProtocol(){
+ return chosenProtocol;
+ }
+
+ public Long getStartByte() {
+ return startByte;
+ }
+
+ public void setStartByte(Long startByte) {
+ this.startByte = startByte;
+ }
+
+ public Long getEndByte() {
+ return endByte;
+ }
+
+ public void setEndByte(Long endByte) {
+ this.endByte = endByte;
+ }
+
+ /**
+ * checks if a byte range is defined
+ * @return <code>true</code> iff both startByte and endByte are defined
+ */
+ protected boolean isRange(){
+ return startByte!=null && endByte!=null;
+ }
+
+ /**
+ * get the number of bytes in the byte range, or "-1" if the range is open-ended
+ * @return
+ */
+ protected long getRangeSize(){
+ if(Long.MAX_VALUE==endByte)return -1;
+ return endByte-startByte;
+ }
+}