You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/24 22:51:59 UTC
[03/11] creating gfac-bes and gfac-gram out from gfac-core
http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
new file mode 100644
index 0000000..5be087e
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
@@ -0,0 +1,558 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.external;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.util.GridFTPContactInfo;
+import org.globus.ftp.DataChannelAuthentication;
+import org.globus.ftp.DataSourceStream;
+import org.globus.ftp.FileInfo;
+import org.globus.ftp.GridFTPClient;
+import org.globus.ftp.HostPort;
+import org.globus.ftp.Marker;
+import org.globus.ftp.MarkerListener;
+import org.globus.ftp.MlsxEntry;
+import org.globus.ftp.Session;
+import org.globus.ftp.exception.ClientException;
+import org.globus.ftp.exception.ServerException;
+import org.globus.gsi.gssapi.auth.HostAuthorization;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GridFTP tools
+ */
+public class GridFtp {
+ public static final Logger log = LoggerFactory.getLogger(GridFtp.class);
+
+ public static final String GSIFTP_SCHEME = "gsiftp";
+ public static final String HOST = "host";
+
+ /**
+ * Make directory at remote location
+ *
+ * @param destURI
+ * @param gssCred
+ * @throws ServerException
+ * @throws IOException
+ */
+ public void makeDir(URI destURI, GSSCredential gssCred) throws ToolsException {
+ GridFTPClient destClient = null;
+ GridFTPContactInfo destHost = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+ try {
+
+ String destPath = destURI.getPath();
+ log.info(("Creating Directory = " + destHost + "=" + destPath));
+
+ destClient = new GridFTPClient(destHost.hostName, destHost.port);
+
+ int tryCount = 0;
+ while (true) {
+ try {
+ destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ destClient.authenticate(gssCred);
+ destClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+ if (!destClient.exists(destPath)) {
+ destClient.makeDir(destPath);
+ }
+ break;
+ } catch (ServerException e) {
+ tryCount++;
+ if (tryCount >= 3) {
+ throw new ToolsException(e.getMessage(), e);
+ }
+ Thread.sleep(10000);
+ } catch (IOException e) {
+ tryCount++;
+ if (tryCount >= 3) {
+ throw new ToolsException(e.getMessage(), e);
+ }
+ Thread.sleep(10000);
+ }
+ }
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot Create GridFTP Client to:" + destHost.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot Create GridFTP Client to:" + destHost.toString(), e);
+ } catch (InterruptedException e) {
+ throw new ToolsException("Internal Error cannot sleep", e);
+ } finally {
+ if (destClient != null) {
+ try {
+ destClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Upload file from stream
+ *
+ * @param destURI
+ * @param gsCredential
+ * @param io
+ * @throws GFacException
+ */
+ public void uploadFile(URI destURI, GSSCredential gsCredential, InputStream io) throws ToolsException {
+ GridFTPClient ftpClient = null;
+ GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+
+ try {
+
+ String remoteFile = destURI.getPath();
+ log.info("The remote file is " + remoteFile);
+
+ log.debug("Setup GridFTP Client");
+
+ ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+ ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ ftpClient.authenticate(gsCredential);
+ ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+ log.info("Uploading file");
+ if (checkBinaryExtensions(remoteFile)) {
+ log.debug("Transfer mode is set to Binary for a file upload");
+ ftpClient.setType(Session.TYPE_IMAGE);
+ }
+
+ ftpClient.put(remoteFile, new DataSourceStream(io), new MarkerListener() {
+ public void markerArrived(Marker marker) {
+ }
+ });
+
+ log.info("Upload file to:" + remoteFile + " is done");
+
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } catch (ClientException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } finally {
+ if (ftpClient != null) {
+ try {
+ ftpClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ public void uploadFile(URI srcURI, URI destURI, GSSCredential gsCredential) throws ToolsException {
+ GridFTPClient srcClient = null;
+ GridFTPContactInfo destContactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+ GridFTPContactInfo srcContactInfo = new GridFTPContactInfo(srcURI.getHost(),srcURI.getPort());
+ try {
+ String remoteFile = destURI.getPath();
+ log.info("The remote file is " + remoteFile);
+ log.debug("Setup GridFTP Client");
+ srcClient = new GridFTPClient(srcContactInfo.hostName, srcContactInfo.port);
+ srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ srcClient.authenticate(gsCredential);
+ srcClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+ GridFTPClient destClient = new GridFTPClient(destContactInfo.hostName, destContactInfo.port);
+ destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ destClient.authenticate(gsCredential);
+ destClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+ log.debug("Uploading file");
+ if (checkBinaryExtensions(remoteFile)) {
+ log.debug("Transfer mode is set to Binary for a file upload");
+ srcClient.setType(Session.TYPE_IMAGE);
+ }
+
+ srcClient.transfer(srcURI.getPath(),destClient, remoteFile, false, null);
+
+ log.info("Upload file to:" + remoteFile + " is done");
+
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+ } catch (ClientException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+ } finally {
+ if (srcClient != null) {
+ try {
+ srcClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Upload file to remote location
+ *
+ * @param destURI
+ * @param gsCredential
+ * @param localFile
+ * @throws GFacException
+ */
+ public void uploadFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+ GridFTPClient ftpClient = null;
+ GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+ try {
+
+ String remoteFile = destURI.getPath();
+
+ log.info("The local temp file is " + localFile);
+ log.info("the remote file is " + remoteFile);
+
+ log.debug("Setup GridFTP Client");
+
+ ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+ ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ ftpClient.authenticate(gsCredential);
+ ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+ log.debug("Uploading file");
+ if (checkBinaryExtensions(remoteFile)) {
+ log.debug("Transfer mode is set to Binary for a file upload");
+ ftpClient.setType(Session.TYPE_IMAGE);
+ }
+
+
+ ftpClient.put(localFile, remoteFile, false);
+
+ log.info("Upload file to:" + remoteFile + " is done");
+
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } catch (ClientException e) {
+ throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+ } finally {
+ if (ftpClient != null) {
+ try {
+ ftpClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Download File from remote location
+ *
+ * @param destURI
+ * @param gsCredential
+ * @param localFile
+ * @throws GFacException
+ */
+ public void downloadFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+ GridFTPClient ftpClient = null;
+ GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+ try {
+ String remoteFile = destURI.getPath();
+
+ log.info("The local temp file is " + localFile);
+ log.info("the remote file is " + remoteFile);
+
+ log.debug("Setup GridFTP Client");
+
+ ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+ ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ ftpClient.authenticate(gsCredential);
+ ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+ log.debug("Downloading file");
+ if (checkBinaryExtensions(remoteFile)) {
+ log.debug("Transfer mode is set to Binary to download a file");
+ ftpClient.setType(Session.TYPE_IMAGE);
+ }
+
+ ftpClient.get(remoteFile, localFile);
+
+ log.info("Download file to:" + localFile + " is done");
+
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+ } catch (ClientException e) {
+ throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+ } finally {
+ if (ftpClient != null) {
+ try {
+ //ftpClient.close();
+ ftpClient.close(false);
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Stream remote file
+ *
+ * @param destURI
+ * @param gsCredential
+ * @param localFile
+ * @return
+ * @throws GFacException
+ */
+ public String readRemoteFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+ BufferedReader instream = null;
+ File localTempfile = null;
+ try {
+
+ if (localFile == null) {
+ localTempfile = File.createTempFile("stderr", "err");
+ } else {
+ localTempfile = localFile;
+ }
+
+ log.info("Local temporary file:" + localTempfile);
+
+ downloadFile(destURI, gsCredential, localTempfile);
+
+ instream = new BufferedReader(new FileReader(localTempfile));
+ StringBuffer buff = new StringBuffer();
+ String temp = null;
+ while ((temp = instream.readLine()) != null) {
+ buff.append(temp);
+ buff.append(Constants.NEWLINE);
+ }
+
+ log.info("finish read file:" + localTempfile);
+
+ return buff.toString();
+ } catch (FileNotFoundException e) {
+ throw new ToolsException("Cannot read localfile file:" + localTempfile, e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot read localfile file:" + localTempfile, e);
+ } finally {
+ if (instream != null) {
+ try {
+ instream.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection",e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Transfer data from one GridFTp Endpoint to another GridFTP Endpoint
+ *
+ * @param srchost
+ * @param desthost
+ * @param gssCred
+ * @param srcActive
+ * @throws ServerException
+ * @throws ClientException
+ * @throws IOException
+ */
+ public void transfer(URI srchost, URI desthost, GSSCredential gssCred, boolean srcActive) throws ToolsException {
+ GridFTPClient destClient = null;
+ GridFTPClient srcClient = null;
+
+ try {
+ destClient = new GridFTPClient(desthost.getHost(), desthost.getPort());
+ destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ destClient.authenticate(gssCred);
+
+ if (checkBinaryExtensions(desthost.getPath())) {
+ log.debug("Transfer mode is set to Binary");
+ destClient.setType(Session.TYPE_IMAGE);
+ }
+
+ srcClient = new GridFTPClient(srchost.getHost(), srchost.getPort());
+ srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ srcClient.authenticate(gssCred);
+
+ if (checkBinaryExtensions(srchost.getPath())) {
+ log.debug("Transfer mode is set to Binary");
+ srcClient.setType(Session.TYPE_IMAGE);
+ }
+
+ if (srcActive) {
+ log.debug("Set src active");
+ HostPort hp = destClient.setPassive();
+ srcClient.setActive(hp);
+ } else {
+ log.debug("Set dst active");
+ HostPort hp = srcClient.setPassive();
+ destClient.setActive(hp);
+ }
+
+ log.debug("Start transfer file from GridFTP:" + srchost.toString() + " to " + desthost.toString());
+
+ /**
+ * Transfer a file. The transfer() function blocks until the transfer is complete.
+ */
+ srcClient.transfer(srchost.getPath(), destClient, desthost.getPath(), false, null);
+ if (srcClient.getSize(srchost.getPath()) == destClient.getSize(desthost.getPath())) {
+ log.debug("CHECK SUM OK");
+ } else {
+ log.debug("****CHECK SUM FAILED****");
+ }
+
+ } catch (ServerException e) {
+ throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+ + desthost.toString(), e);
+ } catch (IOException e) {
+ throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+ + desthost.toString(), e);
+ } catch (ClientException e) {
+ throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+ + desthost.toString(), e);
+ } finally {
+ if (destClient != null) {
+ try {
+ destClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection at Desitnation:" + desthost.toString());
+ }
+ }
+ if (srcClient != null) {
+ try {
+ srcClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection at Source:" + srchost.toString(),e);
+ }
+ }
+ }
+ }
+
+ /**
+ * List files in a GridFTP directory
+ * @param dirURI
+ * @param gssCred
+ * @return
+ * @throws ToolsException
+ */
+ @SuppressWarnings("unchecked")
+ public List<String> listDir(URI dirURI, GSSCredential gssCred) throws ToolsException {
+ List<String> files = new ArrayList<String>();
+ GridFTPClient srcClient = null;
+ try {
+ GridFTPContactInfo contactInfo = new GridFTPContactInfo(dirURI.getHost(), dirURI.getPort());
+
+ srcClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+ srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+ srcClient.authenticate(gssCred);
+ srcClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+ srcClient.setType(Session.TYPE_ASCII);
+ srcClient.changeDir(dirURI.getPath());
+
+ Vector<Object> fileInfo = null;
+ try {
+ fileInfo = srcClient.mlsd();
+ } catch (Throwable e) {
+ fileInfo = srcClient.list();
+ }
+
+ if (!fileInfo.isEmpty()) {
+ for (int j = 0; j < fileInfo.size(); ++j) {
+ String name = null;
+ if (fileInfo.get(j) instanceof MlsxEntry) {
+ name = ((MlsxEntry) fileInfo.get(j)).getFileName();
+ } else if (fileInfo.get(j) instanceof FileInfo) {
+ name = ((FileInfo) fileInfo.get(j)).getName();
+ } else {
+ throw new ToolsException("Unsupported type returned by gridftp " + fileInfo.get(j));
+ }
+
+ if (!name.equals(".") && !name.equals("..")) {
+ URI uri = GramProviderUtils.createGsiftpURI(contactInfo.hostName, dirURI.getPath() + File.separator + name);
+ files.add(uri.getPath());
+ }
+ }
+ }
+ return files;
+ } catch (IOException e) {
+ throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+ } catch (ServerException e) {
+ throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+ } catch (ClientException e) {
+ throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+ } catch (URISyntaxException e) {
+ throw new ToolsException("Error creating URL of listed files: " + dirURI.toString() ,e);
+ } finally {
+ if (srcClient != null) {
+ try {
+ srcClient.close();
+ } catch (Exception e) {
+ log.warn("Cannot close GridFTP client connection", e);
+ }
+ }
+ }
+ }
+ /**
+ * Method to check file extension as binary to set transfer type
+ * @param filePath
+ * @return
+ */
+ private static boolean checkBinaryExtensions(String filePath){
+ String extension = filePath.substring(filePath.lastIndexOf(".")+1,filePath.length());
+ Set<String> extensions = new HashSet<String>(Arrays.asList(new String[] {"tar","zip","gz","tgz"}));
+ if(extensions.contains(extension)){
+ return true;
+ }else{
+ return false;
+ }
+
+ }
+
+
+
+
+ public String gridFTPFileExist(URI inputDirectory,String fileName,GSSCredential gssCred) throws ToolsException {
+ List<String> strings = listDir(inputDirectory, gssCred);
+ for(String fileExist:strings){
+ if(fileName.equals(fileExist)) {
+ fileName = "duplicate_" + fileName;
+ return fileName;
+ }
+ }
+ return fileName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
new file mode 100644
index 0000000..feadd72
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramDirectorySetupHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(GramDirectorySetupHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+ log.info("Invoking GramDirectorySetupHandler ...");
+ super.invoke(jobExecutionContext);
+ String[] gridFTPEndpointArray = null;
+
+ //TODO: why it is tightly coupled with gridftp
+// GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+ //TODO: make it more reusable
+ HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+
+
+ if(hostType instanceof GlobusHostType){
+ gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+ }
+ else if (hostType instanceof UnicoreHostType){
+ gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+ }
+
+
+
+ ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
+ GridFtp ftp = new GridFtp();
+
+ try {
+
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+ getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+ if (gridFTPEndpointArray == null || gridFTPEndpointArray.length == 0) {
+ gridFTPEndpointArray = new String[]{hostType.getHostAddress()};
+ }
+ boolean success = false;
+ GFacHandlerException pe = null;// = new ProviderException("");
+ for (String endpoint : gridFTPEndpointArray) {
+ try {
+
+ URI tmpdirURI = GramProviderUtils.createGsiftpURI(endpoint, app.getScratchWorkingDirectory());
+ URI workingDirURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStaticWorkingDirectory());
+ URI inputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+ URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+
+ log.info("Host FTP = " + gridFTPEndpointArray[0]);
+ log.info("temp directory = " + tmpdirURI);
+ log.info("Working directory = " + workingDirURI);
+ log.info("Input directory = " + inputURI);
+ log.info("Output directory = " + outputURI);
+ ftp.makeDir(tmpdirURI, gssCred);
+ ftp.makeDir(workingDirURI, gssCred);
+ ftp.makeDir(inputURI, gssCred);
+ ftp.makeDir(outputURI, gssCred);
+ success = true;
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+ status.setTransferState(TransferState.DIRECTORY_SETUP);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Working directory = " + workingDirURI);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ break;
+ } catch (URISyntaxException e) {
+ pe = new GFacHandlerException("URI is malformatted:" + e.getMessage(), e);
+
+ } catch (Exception e) {
+ pe = new GFacHandlerException(e.getMessage(), e);
+ }
+ }
+ if (success == false) {
+ GFacUtils.saveErrorDetails(pe.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ throw pe;
+ }
+ } catch (SecurityException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (ApplicationSettingsException e1) {
+ throw new GFacHandlerException(e1.getMessage(), e1);
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
new file mode 100644
index 0000000..4b1beab
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GridFTPInputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(AppDescriptorCheckHandler.class);
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+ log.info("Invoking GridFTPInputHandler ...");
+ super.invoke(jobExecutionContext);
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ MessageContext inputNew = new MessageContext();
+ try {
+ MessageContext input = jobExecutionContext.getInMessageContext();
+ Set<String> parameters = input.getParameters().keySet();
+ for (String paramName : parameters) {
+ ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+ String paramValue = MappingFactory.toString(actualParameter);
+ //TODO: Review this with type
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+ List<String> newFiles = new ArrayList<String>();
+ for (String paramValueEach : split) {
+ String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+ detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+ status.setTransferState(TransferState.UPLOAD);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ newFiles.add(stageInputFiles);
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+ inputNew.getParameters().put(paramName, actualParameter);
+
+ }
+ } catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+ }
+ jobExecutionContext.setInMessageContext(inputNew);
+ }
+
+ private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException,GFacException, ApplicationSettingsException {
+ URI gridftpURL = new URI(paramValue);
+
+ String[] gridFTPEndpointArray = null;
+
+ // not to download input files to the input dir if its http / gsiftp
+ // but if local then yes
+ boolean isInputNonLocal = true;
+
+ //TODO: why it is tightly coupled with gridftp
+// GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+ //TODO: make it more reusable
+ HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+ if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+ gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+ }
+ else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+ gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+ isInputNonLocal = false;
+ }
+ else {
+ //TODO
+ }
+
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GridFtp ftp = new GridFtp();
+ URI destURI = null;
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+ for (String endpoint : gridFTPEndpointArray) {
+ URI inputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+ String fileName = new File(gridftpURL.getPath()).getName();
+ fileName = ftp.gridFTPFileExist(inputURI, fileName,gssCred);
+
+ String destLocalPath = inputURI.getPath() + File.separator + fileName;
+ //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer
+ if (fileName != null && !"".equals(fileName)) {
+ destURI = GramProviderUtils.createGsiftpURI(endpoint, destLocalPath);
+ if (paramValue.startsWith("gsiftp")) {
+ // no need to do if it is unicore, as unicore will download this on user's behalf to the job space dir
+ if(isInputNonLocal) ftp.uploadFile(gridftpURL, destURI, gssCred);
+ else return paramValue;
+ } else if (paramValue.startsWith("file")) {
+ String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(localFile);
+ ftp.uploadFile(destURI, gssCred, fis);
+ } catch (IOException e) {
+ throw new GFacException("Unable to create file : " + localFile ,e);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ } else if (paramValue.startsWith("http")) {
+ // no need to do if it is unicore
+ if(isInputNonLocal) {
+ InputStream is = null;
+ try {
+ is = gridftpURL.toURL().openStream();
+ ftp.uploadFile(destURI, gssCred, (is));
+ }finally {
+ is.close();
+ }
+ } else {
+ // don't return destUri
+ return paramValue;
+ }
+
+ } else {
+ //todo throw exception telling unsupported protocol
+ return paramValue;
+ }
+ } else {
+ // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input
+ return paramValue;
+ }
+ }
+ return destURI.getPath();
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
new file mode 100644
index 0000000..e0cb0f8
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GridFTPOutputHandler extends AbstractHandler {
+ private static final Logger log = LoggerFactory.getLogger(GridFTPOutputHandler.class);
+ private Registry registry;
+
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+ log.info("Invoking GridFTPOutputHandler ...");
+ super.invoke(jobExecutionContext);
+
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ String[] gridFTPEndpointArray = null;
+ String hostName = null;
+
+ if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+ gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+ hostName = ((GlobusHostType) hostType).getHostName();
+
+ }
+ else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+ gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+ hostName = ((UnicoreHostType) hostType).getHostName();
+ }
+ else {
+ //TODO
+ }
+
+ GridFtp ftp = new GridFtp();
+ File localStdErrFile = null;
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+ DataTransferDetails detail = new DataTransferDetails();
+ TransferStatus status = new TransferStatus();
+
+ try {
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+ String[] hostgridFTP = gridFTPEndpointArray;
+ if (hostgridFTP == null || hostgridFTP.length == 0) {
+ hostgridFTP = new String[]{hostName};
+ }
+ for (String endpoint : gridFTPEndpointArray) {
+ try {
+ /*
+ * Read Stdout and Stderror
+ */
+ URI stdoutURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStandardOutput());
+ URI stderrURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStandardError());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDOUT:" + stdoutURI.toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ status.setTransferState(TransferState.COMPLETE);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("STDERR:" + stderrURI.toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ log.info("STDOUT:" + stdoutURI.toString());
+ log.info("STDERR:" + stderrURI.toString());
+
+ File logDir = new File("./service_logs");
+ if (!logDir.exists()) {
+ logDir.mkdir();
+ }
+
+ String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext
+ .getServiceName());
+ File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
+ localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
+
+
+ String stdout = null;
+ String stderr = null;
+
+ // TODO: what if job is failed
+ // and this handler is not able to find std* files?
+ try {
+ stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
+ stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
+ //TODO: do we also need to set them as output parameters for another job
+ ApplicationDescription application = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+ ApplicationDeploymentDescriptionType appDesc = application.getType();
+ appDesc.setStandardOutput(stdout);
+ appDesc.setStandardError(stderr);
+ jobExecutionContext.getApplicationContext().setApplicationDeploymentDescription(application);
+ }
+ catch(ToolsException e) {
+ log.error("Cannot download stdout/err files. One reason could be the job is not successfully finished: "+e.getMessage());
+ }
+
+
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ Set<String> keys = output.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+ List<String> outputList = ftp.listDir(outputURI, gssCred);
+ String[] valueList = outputList.toArray(new String[outputList.size()]);
+ ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+ // why to instantiate new instance?
+// stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }else if ("StringArray".equals(actualParameter.getType().getType().toString())) {
+ String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
+ ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
+// stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ } else if ("URI".equals(actualParameter.getType().getType().toString())) {
+ URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+ List<String> outputList = ftp.listDir(outputURI, gssCred);
+ if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdout, stderr);
+ } else {
+ String valueList = outputList.get(0);
+ ((URIParameterType) actualParameter.getType()).setValue(valueList);
+ stringMap = new HashMap<String, ActualParameter>();
+ stringMap.put(paramName, actualParameter);
+ }
+ }
+ else {
+ // This is to handle exception during the output parsing.
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdout, stderr);
+ }
+ status.setTransferState(TransferState.DOWNLOAD);
+ detail.setTransferStatus(status);
+ detail.setTransferDescription("Output: " + stringMap.get(paramName).toString());
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+
+ }
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacHandlerException("Empty Output returned from the Application, Double check the application" +
+ "and ApplicationDescriptor output Parameter Names");
+ }
+ // If users has given an output Data path to download the output files this will download the file on machine where GFac is installed
+ TaskDetails taskData = jobExecutionContext.getTaskData();
+ if(taskData != null && taskData.getAdvancedOutputDataHandling() != null){
+ String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+ if(outputDataDirectory != null && !"".equals(outputDataDirectory)){
+ stageOutputFiles(jobExecutionContext,outputDataDirectory);
+ }
+ }
+ } catch (ToolsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException(e.getMessage() + "\n StdError Data: \n" +readLastLinesofStdOut(localStdErrFile.getPath(), 20),e);
+ } catch (URISyntaxException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("URI is malformatted:" + e.getMessage(), e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }
+ }
+ } catch (Exception e) {
+ try {
+ status.setTransferState(TransferState.FAILED);
+ detail.setTransferStatus(status);
+ registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e1) {
+ throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ }
+ log.error(e.getMessage());
+ throw new GFacHandlerException(e.getMessage(), e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }
+
+ }
+
+ private static String readLastLinesofStdOut(String path, int count) {
+ StringBuffer buffer = new StringBuffer();
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(path);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ List<String> strLine = new ArrayList<String>();
+ String tmp = null;
+ int numberofLines = 0;
+ try {
+ while ((tmp = br.readLine()) != null) {
+ strLine.add(tmp);
+ numberofLines++;
+ }
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ if (numberofLines > count) {
+ for (int i = numberofLines - count; i < numberofLines; i++) {
+ buffer.append(strLine.get(i));
+ buffer.append("\n");
+ }
+ } else {
+ for (int i = 0; i < numberofLines; i++) {
+ buffer.append(strLine.get(i));
+ buffer.append("\n");
+ }
+ }
+ try {
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return buffer.toString();
+ }
+
+ private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException,GFacException, ApplicationSettingsException {
+
+
+ HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ String[] gridFTPEndpointArray = null;
+
+ if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+ gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+ }
+ else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+ gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+ }
+ else {
+ //TODO
+ }
+
+
+ MessageContext outputNew = new MessageContext();
+ MessageContext output = jobExecutionContext.getOutMessageContext();
+ Map<String, Object> parameters = output.getParameters();
+ for (String paramName : parameters.keySet()) {
+ ActualParameter actualParameter = (ActualParameter) parameters
+ .get(paramName);
+
+ GridFtp ftp = new GridFtp();
+ GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+ try {
+ if ("URI".equals(actualParameter.getType().getType().toString())) {
+ for (String endpoint : gridFTPEndpointArray) {
+ ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath,
+ MappingFactory.toString(actualParameter), ftp, gssCred, endpoint));
+ }
+ } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ List<String> split = Arrays.asList(StringUtil.getElementsFromString(MappingFactory.toString(actualParameter)));
+ List<String> newFiles = new ArrayList<String>();
+ for (String endpoint : gridFTPEndpointArray) {
+ for (String paramValueEach : split) {
+ newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint));
+ }
+ ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+ }
+
+ }
+ } catch (URISyntaxException e) {
+ log.error(e.getMessage());
+ throw new GFacProviderException(e.getMessage(), e);
+ } catch (ToolsException e) {
+ log.error(e.getMessage());
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ outputNew.getParameters().put(paramName, actualParameter);
+ }
+ jobExecutionContext.setOutMessageContext(outputNew);
+ }
+
+ private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException {
+ URI srcURI = GramProviderUtils.createGsiftpURI(endpoint, paramValue);
+ String fileName = new File(srcURI.getPath()).getName();
+ File outputpath = new File(outputFileStagingPath);
+ if(!outputpath.exists()){
+ outputpath.mkdirs();
+ }
+ File outputFile = new File(outputpath.getAbsolutePath() + File.separator + fileName);
+ ftp.readRemoteFile(srcURI,
+ gssCred, outputFile);
+ return outputFileStagingPath + File.separator + fileName;
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
new file mode 100644
index 0000000..3086b95
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.persistence;
+
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.log4j.Logger;
+import org.globus.gram.internal.GRAMConstants;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/18/13
+ * Time: 4:16 PM
+ * Database based job persistence manager. Current default implementation.
+ */
+
+public class DBJobPersistenceManager implements JobPersistenceManager {
+
+ private DBUtil dbUtil;
+
+ private static final Logger log = Logger.getLogger(DBJobPersistenceManager.class);
+
+
+ public DBJobPersistenceManager(DBUtil db) {
+ this.dbUtil = db;
+ }
+
+ public synchronized void updateJobStatus(JobData jobData) throws GFacException {
+
+ if (jobData.getState() == GRAMConstants.STATUS_UNSUBMITTED) {
+ insertJob(jobData);
+ } else {
+
+ String sql = "update gram_job set status = ? where job_id = ?";
+
+ Connection connection = null;
+ PreparedStatement stmt = null;
+
+ try {
+ connection = getConnection();
+ stmt = connection.prepareStatement(sql);
+ stmt.setInt(1, jobData.getState());
+ stmt.setString(2, jobData.getJobId());
+
+ stmt.executeUpdate();
+ connection.commit();
+
+ } catch (SQLException e) {
+ throw new GFacException(e);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ } catch (SQLException e) {
+ log.error("Error closing streams", e);
+ }
+ }
+ }
+ }
+
+ private void insertJob(JobData jobData) throws GFacException {
+
+ String sql = "insert into gram_job values (?, ?)";
+
+ PreparedStatement stmt = null;
+ Connection connection = null;
+
+ try {
+ connection = getConnection();
+ stmt = connection.prepareStatement(sql);
+ stmt.setString(1, jobData.getJobId());
+ stmt.setInt(2, jobData.getState());
+
+ stmt.executeUpdate();
+ } catch (SQLException e) {
+ throw new GFacException(e);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ } catch (SQLException e) {
+ log.error("Error closing streams", e);
+ }
+ }
+
+ }
+
+ public List<JobData> getRunningJobs() throws GFacException {
+
+ String sql = "select * from gram_job where status not in (?, ?, ?)";
+
+ int[] statuses = new int[3];
+ statuses[0] = GRAMConstants.STATUS_UNSUBMITTED;
+ statuses[1] = GRAMConstants.STATUS_DONE;
+ statuses[2] = GRAMConstants.STATUS_FAILED;
+
+ return getJobs(sql, statuses);
+ }
+
+ public List<JobData> getFailedJobs() throws GFacException {
+
+ String sql = "select * from gram_job where status in (?)";
+
+ int[] statuses = new int[1];
+ statuses[0] = GRAMConstants.STATUS_FAILED;
+
+ return getJobs(sql, statuses);
+ }
+
+ public List<JobData> getUnSubmittedJobs() throws GFacException {
+
+ String sql = "select * from gram_job where status in (?)";
+
+ int[] statuses = new int[1];
+ statuses[0] = GRAMConstants.STATUS_UNSUBMITTED;
+
+ return getJobs(sql, statuses);
+ }
+
+ public List<JobData> getSuccessfullyCompletedJobs() throws GFacException {
+
+ String sql = "select * from gram_job where status in (?)";
+
+ int[] statuses = new int[1];
+ statuses[0] = GRAMConstants.STATUS_DONE;
+
+ return getJobs(sql, statuses);
+
+ }
+
+
+ protected List<JobData> getJobs(String sql, int[] statuses) throws GFacException {
+
+ List<JobData> jobs = new ArrayList<JobData>();
+
+ PreparedStatement preparedStatement = null;
+ Connection connection = null;
+
+ try {
+ connection = getConnection();
+ preparedStatement = connection.prepareStatement(sql);
+
+ int index = 1;
+ for (int status : statuses) {
+ preparedStatement.setInt(index, status);
+ ++index;
+ }
+
+ ResultSet resultSet = preparedStatement.executeQuery();
+
+ while (resultSet.next()) {
+
+ String jobId = resultSet.getString("job_id");
+ int state = resultSet.getInt("status");
+
+ jobs.add(new JobData(jobId, state));
+ }
+
+ } catch (SQLException e) {
+ throw new GFacException(e);
+ } finally {
+ try {
+ if (preparedStatement != null) {
+ preparedStatement.close();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ } catch (SQLException e) {
+ log.error("Error closing connection", e);
+ }
+ }
+
+ return jobs;
+ }
+
+ private synchronized Connection getConnection() throws SQLException {
+ Connection connection = dbUtil.getConnection();
+ connection.setAutoCommit(true);
+
+ return connection;
+ }
+}