You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/24 22:51:59 UTC

[03/11] creating gfac-bes and gfac-gram out from gfac-core

http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
new file mode 100644
index 0000000..5be087e
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/external/GridFtp.java
@@ -0,0 +1,558 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.external;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.util.GridFTPContactInfo;
+import org.globus.ftp.DataChannelAuthentication;
+import org.globus.ftp.DataSourceStream;
+import org.globus.ftp.FileInfo;
+import org.globus.ftp.GridFTPClient;
+import org.globus.ftp.HostPort;
+import org.globus.ftp.Marker;
+import org.globus.ftp.MarkerListener;
+import org.globus.ftp.MlsxEntry;
+import org.globus.ftp.Session;
+import org.globus.ftp.exception.ClientException;
+import org.globus.ftp.exception.ServerException;
+import org.globus.gsi.gssapi.auth.HostAuthorization;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GridFTP tools
+ */
+public class GridFtp {
+    public static final Logger log = LoggerFactory.getLogger(GridFtp.class);
+
+    public static final String GSIFTP_SCHEME = "gsiftp";
+    public static final String HOST = "host";
+
+    /**
+     * Make directory at remote location
+     *
+     * @param destURI
+     * @param gssCred
+     * @throws ServerException
+     * @throws IOException
+     */
+    public void makeDir(URI destURI, GSSCredential gssCred) throws ToolsException {
+        GridFTPClient destClient = null;
+        GridFTPContactInfo destHost = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+        try {
+
+            String destPath = destURI.getPath();
+            log.info(("Creating Directory = " + destHost + "=" + destPath));
+
+            destClient = new GridFTPClient(destHost.hostName, destHost.port);
+
+            int tryCount = 0;
+            while (true) {
+                try {
+                    destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+                    destClient.authenticate(gssCred);
+                    destClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+                    if (!destClient.exists(destPath)) {
+                        destClient.makeDir(destPath);
+                    }
+                    break;
+                } catch (ServerException e) {
+                    tryCount++;
+                    if (tryCount >= 3) {
+                        throw new ToolsException(e.getMessage(), e);
+                    }
+                    Thread.sleep(10000);
+                } catch (IOException e) {
+                    tryCount++;
+                    if (tryCount >= 3) {
+                        throw new ToolsException(e.getMessage(), e);
+                    }
+                    Thread.sleep(10000);
+                }
+            }
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot Create GridFTP Client to:" + destHost.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot Create GridFTP Client to:" + destHost.toString(), e);
+        } catch (InterruptedException e) {
+            throw new ToolsException("Internal Error cannot sleep", e);
+        } finally {
+            if (destClient != null) {
+                try {
+                    destClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Upload file from stream
+     *
+     * @param destURI
+     * @param gsCredential
+     * @param io
+     * @throws GFacException
+     */
+    public void uploadFile(URI destURI, GSSCredential gsCredential, InputStream io) throws ToolsException {
+        GridFTPClient ftpClient = null;
+        GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+
+        try {
+
+            String remoteFile = destURI.getPath();
+            log.info("The remote file is " + remoteFile);
+
+            log.debug("Setup GridFTP Client");
+
+            ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+            ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            ftpClient.authenticate(gsCredential);
+            ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+            log.info("Uploading file");
+            if (checkBinaryExtensions(remoteFile)) {
+                log.debug("Transfer mode is set to Binary for a file upload");
+                ftpClient.setType(Session.TYPE_IMAGE);
+            }
+
+            ftpClient.put(remoteFile, new DataSourceStream(io), new MarkerListener() {
+                public void markerArrived(Marker marker) {
+                }
+            });
+
+            log.info("Upload file to:" + remoteFile + " is done");
+
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } catch (ClientException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } finally {
+            if (ftpClient != null) {
+                try {
+                    ftpClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    public void uploadFile(URI srcURI,  URI destURI, GSSCredential gsCredential) throws ToolsException {
+        GridFTPClient srcClient = null;
+        GridFTPContactInfo destContactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+        GridFTPContactInfo srcContactInfo = new GridFTPContactInfo(srcURI.getHost(),srcURI.getPort());
+        try {
+            String remoteFile = destURI.getPath();
+            log.info("The remote file is " + remoteFile);
+            log.debug("Setup GridFTP Client");
+            srcClient = new GridFTPClient(srcContactInfo.hostName, srcContactInfo.port);
+            srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            srcClient.authenticate(gsCredential);
+            srcClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+            GridFTPClient destClient = new GridFTPClient(destContactInfo.hostName, destContactInfo.port);
+            destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            destClient.authenticate(gsCredential);
+            destClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+            log.debug("Uploading file");
+            if (checkBinaryExtensions(remoteFile)) {
+                log.debug("Transfer mode is set to Binary for a file upload");
+                srcClient.setType(Session.TYPE_IMAGE);
+            }
+
+            srcClient.transfer(srcURI.getPath(),destClient, remoteFile, false, null);
+
+            log.info("Upload file to:" + remoteFile + " is done");
+
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+        } catch (ClientException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + destContactInfo.toString(), e);
+        } finally {
+            if (srcClient != null) {
+                try {
+                    srcClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Upload file to remote location
+     *
+     * @param destURI
+     * @param gsCredential
+     * @param localFile
+     * @throws GFacException
+     */
+    public void uploadFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+        GridFTPClient ftpClient = null;
+        GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+        try {
+
+            String remoteFile = destURI.getPath();
+
+            log.info("The local temp file is " + localFile);
+            log.info("the remote file is " + remoteFile);
+
+            log.debug("Setup GridFTP Client");
+
+            ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+            ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            ftpClient.authenticate(gsCredential);
+            ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+            log.debug("Uploading file");
+            if (checkBinaryExtensions(remoteFile)) {
+                log.debug("Transfer mode is set to Binary for a file upload");
+                ftpClient.setType(Session.TYPE_IMAGE);
+            }
+
+
+            ftpClient.put(localFile, remoteFile, false);
+
+            log.info("Upload file to:" + remoteFile + " is done");
+
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } catch (ClientException e) {
+            throw new ToolsException("Cannot upload file to GridFTP:" + contactInfo.toString(), e);
+        } finally {
+            if (ftpClient != null) {
+                try {
+                    ftpClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Download File from remote location
+     *
+     * @param destURI
+     * @param gsCredential
+     * @param localFile
+     * @throws GFacException
+     */
+    public void downloadFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+        GridFTPClient ftpClient = null;
+        GridFTPContactInfo contactInfo = new GridFTPContactInfo(destURI.getHost(), destURI.getPort());
+        try {
+            String remoteFile = destURI.getPath();
+
+            log.info("The local temp file is " + localFile);
+            log.info("the remote file is " + remoteFile);
+
+            log.debug("Setup GridFTP Client");
+
+            ftpClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+            ftpClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            ftpClient.authenticate(gsCredential);
+            ftpClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+
+            log.debug("Downloading file");
+            if (checkBinaryExtensions(remoteFile)) {
+                log.debug("Transfer mode is set to Binary to download a file");
+                ftpClient.setType(Session.TYPE_IMAGE);
+            }
+
+            ftpClient.get(remoteFile, localFile);
+
+            log.info("Download file to:" + localFile + " is done");
+
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+        } catch (ClientException e) {
+            throw new ToolsException("Cannot download file from GridFTP:" + contactInfo.toString(), e);
+        } finally {
+            if (ftpClient != null) {
+                try {
+                    //ftpClient.close();
+                    ftpClient.close(false);
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Stream remote file
+     *
+     * @param destURI
+     * @param gsCredential
+     * @param localFile
+     * @return
+     * @throws GFacException
+     */
+    public String readRemoteFile(URI destURI, GSSCredential gsCredential, File localFile) throws ToolsException {
+        BufferedReader instream = null;
+        File localTempfile = null;
+        try {
+
+            if (localFile == null) {
+                localTempfile = File.createTempFile("stderr", "err");
+            } else {
+                localTempfile = localFile;
+            }
+
+            log.info("Local temporary file:" + localTempfile);
+
+            downloadFile(destURI, gsCredential, localTempfile);
+
+            instream = new BufferedReader(new FileReader(localTempfile));
+            StringBuffer buff = new StringBuffer();
+            String temp = null;
+            while ((temp = instream.readLine()) != null) {
+                buff.append(temp);
+                buff.append(Constants.NEWLINE);
+            }
+
+            log.info("finish read file:" + localTempfile);
+
+            return buff.toString();
+        } catch (FileNotFoundException e) {
+            throw new ToolsException("Cannot read localfile file:" + localTempfile, e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot read localfile file:" + localTempfile, e);
+        } finally {
+            if (instream != null) {
+                try {
+                    instream.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection",e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Transfer data from one GridFTp Endpoint to another GridFTP Endpoint
+     *
+     * @param srchost
+     * @param desthost
+     * @param gssCred
+     * @param srcActive
+     * @throws ServerException
+     * @throws ClientException
+     * @throws IOException
+     */
+    public void transfer(URI srchost, URI desthost, GSSCredential gssCred, boolean srcActive) throws ToolsException {
+        GridFTPClient destClient = null;
+        GridFTPClient srcClient = null;
+
+        try {
+            destClient = new GridFTPClient(desthost.getHost(), desthost.getPort());
+            destClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            destClient.authenticate(gssCred);
+
+            if (checkBinaryExtensions(desthost.getPath())) {
+                log.debug("Transfer mode is set to Binary");
+                destClient.setType(Session.TYPE_IMAGE);
+            }
+
+            srcClient = new GridFTPClient(srchost.getHost(), srchost.getPort());
+            srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+            srcClient.authenticate(gssCred);
+
+            if (checkBinaryExtensions(srchost.getPath())) {
+                log.debug("Transfer mode is set to Binary");
+                srcClient.setType(Session.TYPE_IMAGE);
+            }
+
+            if (srcActive) {
+                log.debug("Set src active");
+                HostPort hp = destClient.setPassive();
+                srcClient.setActive(hp);
+            } else {
+                log.debug("Set dst active");
+                HostPort hp = srcClient.setPassive();
+                destClient.setActive(hp);
+            }
+
+            log.debug("Start transfer file from GridFTP:" + srchost.toString() + " to " + desthost.toString());
+
+            /**
+             * Transfer a file. The transfer() function blocks until the transfer is complete.
+             */
+            srcClient.transfer(srchost.getPath(), destClient, desthost.getPath(), false, null);
+            if (srcClient.getSize(srchost.getPath()) == destClient.getSize(desthost.getPath())) {
+                log.debug("CHECK SUM OK");
+            } else {
+                log.debug("****CHECK SUM FAILED****");
+            }
+
+        } catch (ServerException e) {
+            throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+                    + desthost.toString(), e);
+        } catch (IOException e) {
+            throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+                    + desthost.toString(), e);
+        } catch (ClientException e) {
+            throw new ToolsException("Cannot transfer file from GridFTP:" + srchost.toString() + " to "
+                    + desthost.toString(), e);
+        } finally {
+            if (destClient != null) {
+                try {
+                    destClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection at Desitnation:" + desthost.toString());
+                }
+            }
+            if (srcClient != null) {
+                try {
+                    srcClient.close();
+                } catch (Exception e) {
+                    log.warn("Cannot close GridFTP client connection at Source:" + srchost.toString(),e);
+                }
+            }
+        }
+    }
+
+	/**
+	 * List files in a GridFTP directory
+	 * @param dirURI
+	 * @param gssCred
+	 * @return
+	 * @throws ToolsException
+	 */
+    @SuppressWarnings("unchecked")
+	public List<String> listDir(URI dirURI, GSSCredential gssCred) throws ToolsException {
+    	List<String> files = new  ArrayList<String>();
+	    GridFTPClient srcClient = null;
+			try {
+				GridFTPContactInfo contactInfo = new GridFTPContactInfo(dirURI.getHost(), dirURI.getPort());
+
+				srcClient = new GridFTPClient(contactInfo.hostName, contactInfo.port);
+				srcClient.setAuthorization(new HostAuthorization(GridFtp.HOST));
+				srcClient.authenticate(gssCred);
+				srcClient.setDataChannelAuthentication(DataChannelAuthentication.SELF);
+				srcClient.setType(Session.TYPE_ASCII);
+				srcClient.changeDir(dirURI.getPath());
+
+				Vector<Object> fileInfo = null;
+				try {
+					fileInfo = srcClient.mlsd();
+				} catch (Throwable e) {
+					fileInfo = srcClient.list();
+				}
+
+				if (!fileInfo.isEmpty()) {
+					for (int j = 0; j < fileInfo.size(); ++j) {
+						String name = null;
+						if (fileInfo.get(j) instanceof MlsxEntry) {
+							name = ((MlsxEntry) fileInfo.get(j)).getFileName();
+						} else if (fileInfo.get(j) instanceof FileInfo) {
+							name = ((FileInfo) fileInfo.get(j)).getName();
+						} else {
+							throw new ToolsException("Unsupported type returned by gridftp " + fileInfo.get(j));
+						}
+
+						if (!name.equals(".") && !name.equals("..")) {
+							URI uri = GramProviderUtils.createGsiftpURI(contactInfo.hostName, dirURI.getPath() + File.separator + name);
+							files.add(uri.getPath());
+						}
+					}
+				}
+				return files;
+			} catch (IOException e) {
+				throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+			} catch (ServerException e) {
+				throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+			} catch (ClientException e) {
+				throw new ToolsException("Could not list directory: " + dirURI.toString() ,e);
+			} catch (URISyntaxException e) {
+				throw new ToolsException("Error creating URL of listed files: " + dirURI.toString() ,e);
+			} finally {
+				if (srcClient != null) {
+	                try {
+	                    srcClient.close();
+	                } catch (Exception e) {
+	                    log.warn("Cannot close GridFTP client connection", e);
+	                }
+	            }
+		}
+	}
+    /**
+     * Method to check file extension as binary to set transfer type
+     * @param filePath
+     * @return
+     */
+    private static boolean checkBinaryExtensions(String filePath){
+        String extension = filePath.substring(filePath.lastIndexOf(".")+1,filePath.length());
+        Set<String> extensions = new HashSet<String>(Arrays.asList(new String[] {"tar","zip","gz","tgz"}));
+        if(extensions.contains(extension)){
+            return true;
+        }else{
+            return false;
+        }
+
+    }
+
+
+
+
+    public String gridFTPFileExist(URI inputDirectory,String fileName,GSSCredential gssCred) throws ToolsException {
+        List<String> strings = listDir(inputDirectory, gssCred);
+        for(String fileExist:strings){
+            if(fileName.equals(fileExist)) {
+                fileName = "duplicate_" + fileName;
+                return fileName;
+            }
+        }
+        return fileName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
new file mode 100644
index 0000000..feadd72
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GramDirectorySetupHandler.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class  GramDirectorySetupHandler extends  AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(GramDirectorySetupHandler.class);
+   
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+        log.info("Invoking GramDirectorySetupHandler ...");
+        super.invoke(jobExecutionContext);
+        String[] gridFTPEndpointArray = null;
+
+        //TODO: why it is tightly coupled with gridftp
+//        GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+        //TODO: make it more reusable
+        HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+
+
+        if(hostType instanceof GlobusHostType){
+        	gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+        }
+        else if (hostType instanceof UnicoreHostType){
+        	gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+        }
+        
+
+
+        ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+        ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
+        GridFtp ftp = new GridFtp();
+
+        try {
+
+            GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+                    getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+            if (gridFTPEndpointArray == null || gridFTPEndpointArray.length == 0) {
+            	gridFTPEndpointArray = new String[]{hostType.getHostAddress()};
+            }
+            boolean success = false;
+            GFacHandlerException pe = null;// = new ProviderException("");
+            for (String endpoint : gridFTPEndpointArray) {
+                try {
+
+                    URI tmpdirURI = GramProviderUtils.createGsiftpURI(endpoint, app.getScratchWorkingDirectory());
+                    URI workingDirURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStaticWorkingDirectory());
+                    URI inputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+                    URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+
+                    log.info("Host FTP = " + gridFTPEndpointArray[0]);
+                    log.info("temp directory = " + tmpdirURI);
+                    log.info("Working directory = " + workingDirURI);
+                    log.info("Input directory = " + inputURI);
+                    log.info("Output directory = " + outputURI);
+                    ftp.makeDir(tmpdirURI, gssCred);
+                    ftp.makeDir(workingDirURI, gssCred);
+                    ftp.makeDir(inputURI, gssCred);
+                    ftp.makeDir(outputURI, gssCred);
+                    success = true;
+                    DataTransferDetails detail = new DataTransferDetails();
+                    TransferStatus status = new TransferStatus();
+                    status.setTransferState(TransferState.DIRECTORY_SETUP);
+                    detail.setTransferStatus(status);
+                    detail.setTransferDescription("Working directory = " + workingDirURI);
+                    registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+                                  
+                    break;
+                } catch (URISyntaxException e) {
+                    pe = new GFacHandlerException("URI is malformatted:" + e.getMessage(), e);
+
+                } catch (Exception e) {
+              	pe = new GFacHandlerException(e.getMessage(), e);
+                }
+            }
+            if (success == false) {
+            	GFacUtils.saveErrorDetails(pe.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE,  jobExecutionContext.getTaskData().getTaskID());
+        		throw pe;
+            }
+        } catch (SecurityException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (ApplicationSettingsException e1) {
+        	throw new GFacHandlerException(e1.getMessage(), e1);
+		}
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
new file mode 100644
index 0000000..4b1beab
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPInputHandler.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GridFTPInputHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(AppDescriptorCheckHandler.class);
+ 
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+        log.info("Invoking GridFTPInputHandler ...");
+        super.invoke(jobExecutionContext);
+        DataTransferDetails detail = new DataTransferDetails();
+        TransferStatus status = new TransferStatus();
+       
+        MessageContext inputNew = new MessageContext();
+        try {
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName);
+                String paramValue = MappingFactory.toString(actualParameter);
+                //TODO: Review this with type
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+                    ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue));
+                } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String paramValueEach : split) {
+                        String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach);
+                        detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+                   
+                        newFiles.add(stageInputFiles);
+                    }
+                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+                }
+                inputNew.getParameters().put(paramName, actualParameter);
+               
+            }
+        } catch (Exception e) {
+        	 try {
+         	    status.setTransferState(TransferState.FAILED);
+ 				detail.setTransferStatus(status);
+ 				registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+ 				GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE,  jobExecutionContext.getTaskData().getTaskID());
+ 			} catch (Exception e1) {
+  			    throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+  		   }
+            log.error(e.getMessage());
+            throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException,GFacException, ApplicationSettingsException {
+        URI gridftpURL = new URI(paramValue);
+
+        String[] gridFTPEndpointArray = null;
+
+        // not to download input files to the input dir if its http / gsiftp
+        // but if local then yes
+        boolean isInputNonLocal = true;
+
+        //TODO: why it is tightly coupled with gridftp
+//        GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+        //TODO: make it more reusable
+        HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+
+        if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+        	gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+        }
+        else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+        	gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+        	isInputNonLocal = false;
+        }
+        else {
+        	//TODO
+        }
+
+
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+        GridFtp ftp = new GridFtp();
+        URI destURI = null;
+        GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+        for (String endpoint : gridFTPEndpointArray) {
+            URI inputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+            String fileName = new File(gridftpURL.getPath()).getName();
+            fileName = ftp.gridFTPFileExist(inputURI, fileName,gssCred);
+
+            String destLocalPath = inputURI.getPath() + File.separator + fileName;
+            //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer
+            if (fileName != null && !"".equals(fileName)) {
+                destURI = GramProviderUtils.createGsiftpURI(endpoint, destLocalPath);
+                if (paramValue.startsWith("gsiftp")) {
+                	// no need to do if it is unicore, as unicore will download this on user's behalf to the job space dir
+                	if(isInputNonLocal) ftp.uploadFile(gridftpURL, destURI, gssCred);
+                	else return paramValue;
+                } else if (paramValue.startsWith("file")) {
+                    String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+                    FileInputStream fis = null;
+                    try {
+                    	fis = new FileInputStream(localFile);
+                    	ftp.uploadFile(destURI, gssCred, fis);
+                    } catch (IOException e) {
+                        throw new GFacException("Unable to create file : " + localFile ,e);
+                    } finally {
+                        if (fis != null) {
+                            fis.close();
+                        }
+                    }
+                } else if (paramValue.startsWith("http")) {
+                	// no need to do if it is unicore
+                	if(isInputNonLocal) {
+                		InputStream is = null;
+                		try {
+                			is = gridftpURL.toURL().openStream();
+                			ftp.uploadFile(destURI, gssCred, (is));
+                		}finally {
+                			is.close();
+                		}
+                	} else {
+                		// don't return destUri
+                		return paramValue;
+                	}
+
+                } else {
+                    //todo throw exception telling unsupported protocol
+                    return paramValue;
+                }
+            } else {
+                // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input
+                return paramValue;
+            }
+        }
+        return destURI.getPath();
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
new file mode 100644
index 0000000..e0cb0f8
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/handler/GridFTPOutputHandler.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.util.GramProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.StringArrayType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GridFTPOutputHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(GridFTPOutputHandler.class);
+    private Registry registry;
+
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException {
+       log.info("Invoking GridFTPOutputHandler ...");
+       super.invoke(jobExecutionContext);
+        
+       ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ 	   HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ 	   String[] gridFTPEndpointArray = null;
+ 	   String hostName = null;
+ 
+       if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+        	gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+        	hostName = ((GlobusHostType) hostType).getHostName();
+ 
+       }
+       else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+        	gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+        	hostName = ((UnicoreHostType) hostType).getHostName();
+       }
+       else {
+        	//TODO
+       }
+
+       GridFtp ftp = new GridFtp();
+       File localStdErrFile = null;
+       Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+	   DataTransferDetails detail = new DataTransferDetails();
+	   TransferStatus status = new TransferStatus();
+
+       try {
+    	    GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+    	    String[] hostgridFTP = gridFTPEndpointArray;
+            if (hostgridFTP == null || hostgridFTP.length == 0) {
+                hostgridFTP = new String[]{hostName};
+            }
+            for (String endpoint : gridFTPEndpointArray) {
+                try {
+                    /*
+                     *  Read Stdout and Stderror
+                     */
+					URI stdoutURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStandardOutput());
+                    URI stderrURI = GramProviderUtils.createGsiftpURI(endpoint, app.getStandardError());
+                	status.setTransferState(TransferState.COMPLETE);
+					detail.setTransferStatus(status);
+					detail.setTransferDescription("STDOUT:" + stdoutURI.toString());
+                    registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+                    status.setTransferState(TransferState.COMPLETE);
+					detail.setTransferStatus(status);
+					detail.setTransferDescription("STDERR:" + stderrURI.toString());
+                    registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+                  
+                    log.info("STDOUT:" + stdoutURI.toString());
+                    log.info("STDERR:" + stderrURI.toString());
+
+                    File logDir = new File("./service_logs");
+                    if (!logDir.exists()) {
+                        logDir.mkdir();
+                    }
+
+                    String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext
+                            .getServiceName());
+                    File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
+                    localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
+
+
+                    String stdout = null;
+                    String stderr = null;
+
+                    // TODO: what if job is failed
+                    // and this handler is not able to find std* files?
+                    try {
+                     stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile);
+                     stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile);
+                     //TODO: do we also need to set them as output parameters for another job
+                     ApplicationDescription application = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+                     ApplicationDeploymentDescriptionType appDesc = application.getType();
+                     appDesc.setStandardOutput(stdout);
+                     appDesc.setStandardError(stderr);
+                     jobExecutionContext.getApplicationContext().setApplicationDeploymentDescription(application);
+                    }
+                    catch(ToolsException e) {
+                        log.error("Cannot download stdout/err files. One reason could be the job is not successfully finished:  "+e.getMessage());
+                    }
+
+
+                    Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+                    Set<String> keys = output.keySet();
+                    for (String paramName : keys) {
+                        ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+                        if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                            URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+                            List<String> outputList = ftp.listDir(outputURI, gssCred);
+                            String[] valueList = outputList.toArray(new String[outputList.size()]);
+                            ((URIArrayType) actualParameter.getType()).setValueArray(valueList);
+                            // why to instantiate new instance?
+//                            stringMap = new HashMap<String, ActualParameter>();
+                            stringMap.put(paramName, actualParameter);
+                        }else if ("StringArray".equals(actualParameter.getType().getType().toString())) {
+                            String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName);
+                            ((StringArrayType) actualParameter.getType()).setValueArray(valueList);
+//                            stringMap = new HashMap<String, ActualParameter>();
+                            stringMap.put(paramName, actualParameter);
+                        } else if ("URI".equals(actualParameter.getType().getType().toString())) {
+                        	  URI outputURI = GramProviderUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+                              List<String> outputList = ftp.listDir(outputURI, gssCred);
+							if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+								stringMap = OutputUtils.fillOutputFromStdout(output, stdout, stderr);
+							} else {
+								String valueList = outputList.get(0);
+								((URIParameterType) actualParameter.getType()).setValue(valueList);
+								stringMap = new HashMap<String, ActualParameter>();
+								stringMap.put(paramName, actualParameter);
+							}
+                        }
+                        else {
+                            // This is to handle exception during the output parsing.
+                            stringMap = OutputUtils.fillOutputFromStdout(output, stdout, stderr);
+                        }
+                        status.setTransferState(TransferState.DOWNLOAD);
+    					detail.setTransferStatus(status);
+    					detail.setTransferDescription("Output: " + stringMap.get(paramName).toString());
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+                      
+                    }
+                    if (stringMap == null || stringMap.isEmpty()) {
+                        throw new GFacHandlerException("Empty Output returned from the Application, Double check the application" +
+                                "and ApplicationDescriptor output Parameter Names");
+                    }
+                    // If users has given an output Data path to download the output files this will download the file on machine where GFac is installed
+                    TaskDetails taskData =  jobExecutionContext.getTaskData();
+                    if(taskData != null && taskData.getAdvancedOutputDataHandling() != null){
+                    	String outputDataDirectory = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+                            if(outputDataDirectory != null && !"".equals(outputDataDirectory)){
+                                stageOutputFiles(jobExecutionContext,outputDataDirectory);
+                            }
+                    }
+                } catch (ToolsException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException(e.getMessage() + "\n StdError Data: \n" +readLastLinesofStdOut(localStdErrFile.getPath(), 20),e);
+                } catch (URISyntaxException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException("URI is malformatted:" + e.getMessage(), e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+                }
+            }
+        } catch (Exception e) {
+        	 try {
+        	    status.setTransferState(TransferState.FAILED);
+				detail.setTransferStatus(status);
+				registry.add(ChildDataType.DATA_TRANSFER_DETAIL,detail, jobExecutionContext.getTaskData().getTaskID());
+				GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE,  jobExecutionContext.getTaskData().getTaskID());
+	 		} catch (Exception e1) {
+ 			    throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+ 		   }
+        	log.error(e.getMessage());
+            throw new GFacHandlerException(e.getMessage(), e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+        }
+
+    }
+
+    private static String readLastLinesofStdOut(String path, int count) {
+        StringBuffer buffer = new StringBuffer();
+        FileInputStream in = null;
+        try {
+            in = new FileInputStream(path);
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(in));
+        List<String> strLine = new ArrayList<String>();
+        String tmp = null;
+        int numberofLines = 0;
+        try {
+            while ((tmp = br.readLine()) != null) {
+                strLine.add(tmp);
+                numberofLines++;
+            }
+        } catch (IOException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        if (numberofLines > count) {
+            for (int i = numberofLines - count; i < numberofLines; i++) {
+                buffer.append(strLine.get(i));
+                buffer.append("\n");
+            }
+        } else {
+            for (int i = 0; i < numberofLines; i++) {
+                buffer.append(strLine.get(i));
+                buffer.append("\n");
+            }
+        }
+        try {
+            in.close();
+        } catch (IOException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        return buffer.toString();
+    }
+
+    private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException,GFacException, ApplicationSettingsException {
+
+
+    	   HostDescriptionType hostType = jobExecutionContext.getApplicationContext().getHostDescription().getType();
+    	   String[] gridFTPEndpointArray = null;
+
+           if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GlobusHostType){
+           	gridFTPEndpointArray = ((GlobusHostType) hostType).getGridFTPEndPointArray();
+           }
+           else if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof UnicoreHostType){
+           	gridFTPEndpointArray = ((UnicoreHostType) hostType).getGridFTPEndPointArray();
+           }
+           else {
+           	//TODO
+           }
+
+
+        MessageContext outputNew = new MessageContext();
+        MessageContext output = jobExecutionContext.getOutMessageContext();
+        Map<String, Object> parameters = output.getParameters();
+        for (String paramName : parameters.keySet()) {
+            ActualParameter actualParameter = (ActualParameter) parameters
+                    .get(paramName);
+
+            GridFtp ftp = new GridFtp();
+            GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+            try {
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+                    for (String endpoint : gridFTPEndpointArray) {
+                        ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath,
+                                MappingFactory.toString(actualParameter), ftp, gssCred, endpoint));
+                    }
+                } else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(MappingFactory.toString(actualParameter)));
+                    List<String> newFiles = new ArrayList<String>();
+                    for (String endpoint : gridFTPEndpointArray) {
+                        for (String paramValueEach : split) {
+                            newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint));
+                        }
+                        ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+                    }
+
+                }
+            } catch (URISyntaxException e) {
+                log.error(e.getMessage());
+                throw new GFacProviderException(e.getMessage(), e);
+            } catch (ToolsException e) {
+                log.error(e.getMessage());
+                throw new GFacProviderException(e.getMessage(), e);
+            }
+            outputNew.getParameters().put(paramName, actualParameter);
+        }
+        jobExecutionContext.setOutMessageContext(outputNew);
+    }
+
+    private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException {
+        URI srcURI = GramProviderUtils.createGsiftpURI(endpoint, paramValue);
+        String fileName = new File(srcURI.getPath()).getName();
+        File outputpath = new File(outputFileStagingPath);
+        if(!outputpath.exists()){
+        	outputpath.mkdirs();
+        }
+        File outputFile = new File(outputpath.getAbsolutePath() + File.separator + fileName);
+        ftp.readRemoteFile(srcURI,
+                gssCred, outputFile);
+        return outputFileStagingPath + File.separator + fileName;
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/13b505ae/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
new file mode 100644
index 0000000..3086b95
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/persistence/DBJobPersistenceManager.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.persistence;
+
+import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.log4j.Logger;
+import org.globus.gram.internal.GRAMConstants;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * User: AmilaJ (amilaj@apache.org)
+ * Date: 6/18/13
+ * Time: 4:16 PM
+ * Database based job persistence manager. Current default implementation.
+ */
+
+public class DBJobPersistenceManager implements JobPersistenceManager {
+
+    private DBUtil dbUtil;
+
+    private static final Logger log = Logger.getLogger(DBJobPersistenceManager.class);
+
+
+    public DBJobPersistenceManager(DBUtil db) {
+        this.dbUtil = db;
+    }
+
+    public synchronized void updateJobStatus(JobData jobData) throws GFacException {
+
+        if (jobData.getState() == GRAMConstants.STATUS_UNSUBMITTED) {
+            insertJob(jobData);
+        } else {
+
+            String sql = "update gram_job set status = ? where job_id = ?";
+
+            Connection connection = null;
+            PreparedStatement stmt = null;
+
+            try {
+                connection = getConnection();
+                stmt = connection.prepareStatement(sql);
+                stmt.setInt(1, jobData.getState());
+                stmt.setString(2, jobData.getJobId());
+
+                stmt.executeUpdate();
+                connection.commit();
+
+            } catch (SQLException e) {
+                throw new GFacException(e);
+            } finally {
+                try {
+                    if (stmt != null) {
+                        stmt.close();
+                    }
+
+                    if (connection != null) {
+                        connection.close();
+                    }
+
+                } catch (SQLException e) {
+                    log.error("Error closing streams", e);
+                }
+            }
+        }
+    }
+
+    private void insertJob(JobData jobData) throws GFacException {
+
+        String sql = "insert into gram_job values (?, ?)";
+
+        PreparedStatement stmt = null;
+        Connection connection = null;
+
+        try {
+            connection = getConnection();
+            stmt = connection.prepareStatement(sql);
+            stmt.setString(1, jobData.getJobId());
+            stmt.setInt(2, jobData.getState());
+
+            stmt.executeUpdate();
+        } catch (SQLException e) {
+            throw new GFacException(e);
+        } finally {
+            try {
+                if (stmt != null) {
+                    stmt.close();
+                }
+
+                if (connection != null) {
+                    connection.close();
+                }
+
+            } catch (SQLException e) {
+                log.error("Error closing streams", e);
+            }
+        }
+
+    }
+
+    public List<JobData> getRunningJobs() throws GFacException {
+
+        String sql = "select * from gram_job where status not in (?, ?, ?)";
+
+        int[] statuses = new int[3];
+        statuses[0] = GRAMConstants.STATUS_UNSUBMITTED;
+        statuses[1] = GRAMConstants.STATUS_DONE;
+        statuses[2] = GRAMConstants.STATUS_FAILED;
+
+        return getJobs(sql, statuses);
+    }
+
+    public List<JobData> getFailedJobs() throws GFacException {
+
+        String sql = "select * from gram_job where status in (?)";
+
+        int[] statuses = new int[1];
+        statuses[0] = GRAMConstants.STATUS_FAILED;
+
+        return getJobs(sql, statuses);
+    }
+
+    public List<JobData> getUnSubmittedJobs() throws GFacException {
+
+        String sql = "select * from gram_job where status in (?)";
+
+        int[] statuses = new int[1];
+        statuses[0] = GRAMConstants.STATUS_UNSUBMITTED;
+
+        return getJobs(sql, statuses);
+    }
+
+    public List<JobData> getSuccessfullyCompletedJobs() throws GFacException {
+
+        String sql = "select * from gram_job where status in (?)";
+
+        int[] statuses = new int[1];
+        statuses[0] = GRAMConstants.STATUS_DONE;
+
+        return getJobs(sql, statuses);
+
+    }
+
+
+    protected List<JobData> getJobs(String sql, int[] statuses) throws GFacException {
+
+        List<JobData> jobs = new ArrayList<JobData>();
+
+        PreparedStatement preparedStatement = null;
+        Connection connection = null;
+
+        try {
+            connection = getConnection();
+            preparedStatement = connection.prepareStatement(sql);
+
+            int index = 1;
+            for (int status : statuses) {
+                preparedStatement.setInt(index, status);
+                ++index;
+            }
+
+            ResultSet resultSet = preparedStatement.executeQuery();
+
+            while (resultSet.next()) {
+
+                String jobId = resultSet.getString("job_id");
+                int state = resultSet.getInt("status");
+
+                jobs.add(new JobData(jobId, state));
+            }
+
+        } catch (SQLException e) {
+            throw new GFacException(e);
+        } finally {
+            try {
+                if (preparedStatement != null) {
+                    preparedStatement.close();
+                }
+
+                if (connection != null) {
+                    connection.close();
+                }
+
+            } catch (SQLException e) {
+                log.error("Error closing connection", e);
+            }
+        }
+
+        return jobs;
+    }
+
+    private synchronized Connection getConnection() throws SQLException {
+        Connection connection = dbUtil.getConnection();
+        connection.setAutoCommit(true);
+
+        return connection;
+    }
+}