You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:37 UTC

[20/39] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..e53fe09
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CommonUtils {
+    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
+
+    public static String getChannelID(MonitorID monitorID) {
+        return monitorID.getUserName() + "-" + monitorID.getComputeResourceDescription().getHostName();
+    }
+
+    public static String getRoutingKey(MonitorID monitorID) {
+        return "*." + monitorID.getUserName() + "." + monitorID.getComputeResourceDescription().getIpAddresses().get(0);
+    }
+
+    public static String getChannelID(String userName,String hostAddress) {
+        return userName + "-" + hostAddress;
+    }
+
+    public static String getRoutingKey(String userName,String hostAddress) {
+        return "*." + userName + "." + hostAddress;
+    }
+
+    public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID, JobExecutionContext jobExecutionContext) throws AiravataMonitorException {
+        synchronized (queue) {
+            Iterator<UserMonitorData> iterator = queue.iterator();
+            while (iterator.hasNext()) {
+                UserMonitorData next = iterator.next();
+                if (next.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+                    for (HostMonitorData host : monitorIDs) {
+                        if (isEqual(host.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
+                            // ok we found right place to add this monitorID
+                            host.addMonitorIDForHost(monitorID);
+                            logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+                                    " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+                            return;
+                        }
+                    }
+                    // there is a userMonitor object for this user name but no Hosts for this host
+                    // so we have to create new Hosts
+                    HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
+                    hostMonitorData.addMonitorIDForHost(monitorID);
+                    next.addHostMonitorData(hostMonitorData);
+                    logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+                            " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+                    return;
+                }
+            }
+            HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
+            hostMonitorData.addMonitorIDForHost(monitorID);
+
+            UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+            userMonitorData.addHostMonitorData(hostMonitorData);
+            try {
+                queue.put(userMonitorData);
+                logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
+                        " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
+            } catch (InterruptedException e) {
+                throw new AiravataMonitorException(e);
+            }
+        }
+    }
+
+    private static boolean isEqual(ComputeResourceDescription comRes_1, ComputeResourceDescription comRes_2) {
+        return comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) &&
+                comRes_1.getHostName().equals(comRes_2.getHostName());
+    }
+
+    public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
+        Iterator<MonitorID> iterator = queue.iterator();
+        while(iterator.hasNext()){
+            MonitorID next = iterator.next();
+            if (monitorID.getUserName().equals(next.getUserName()) &&
+                    CommonUtils.isEqual(monitorID.getComputeResourceDescription(), next.getComputeResourceDescription())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * This method doesn't have to be synchronized because it will be invoked by HPCPullMonitor which already synchronized
+     * @param monitorID
+     * @throws AiravataMonitorException
+     */
+    public static void removeMonitorFromQueue(UserMonitorData userMonitorData, MonitorID monitorID) throws AiravataMonitorException {
+                if (userMonitorData.getUserName().equals(monitorID.getUserName())) {
+                    // then this is the right place to update
+                    List<HostMonitorData> hostMonitorData = userMonitorData.getHostMonitorData();
+                    Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator();
+                    while (iterator1.hasNext()) {
+                        HostMonitorData iHostMonitorID = iterator1.next();
+                        if (isEqual(iHostMonitorID.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
+                            Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator();
+                            while (iterator2.hasNext()) {
+                                MonitorID iMonitorID = iterator2.next();
+                                if (iMonitorID.getJobID().equals(monitorID.getJobID())
+                                        || iMonitorID.getJobName().equals(monitorID.getJobName())) {
+                                    // OK we found the object, we cannot do list.remove(object) states of two objects
+                                    // could be different, thats why we check the jobID
+                                    iterator2.remove();
+                                    logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " +
+                                            "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString());
+
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                }
+        logger.info("Cannot find the given MonitorID in the queue with userName " +
+                monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
+        logger.info("This might not be an error because someone else removed this job from the queue");
+    }
+
+
+    public static void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                logger.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
+            }
+        }
+    }
+
+        /**
+         *  Update job count for a given set of paths.
+         * @param curatorClient - CuratorFramework instance
+         * @param changeCountMap - map of change job count with relevant path
+         * @param isAdd - Should add or reduce existing job count by the given job count.
+         */
+    public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
+        StringBuilder changeZNodePaths = new StringBuilder();
+        try {
+            for (String path : changeCountMap.keySet()) {
+                if (isAdd) {
+                    CommonUtils.checkAndCreateZNode(curatorClient, path);
+                }
+                byte[] byteData = curatorClient.getData().forPath(path);
+                String nodeData;
+                if (byteData == null) {
+                    if (isAdd) {
+                        curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
+                    } else {
+                        // This is not possible, but we handle in case there any data zookeeper communication failure
+                        logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
+                        curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
+                    }
+                } else {
+                    nodeData = new String(byteData);
+                    if (isAdd) {
+                        curatorClient.setData().withVersion(-1).forPath(path,
+                                String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
+                    } else {
+                        int previousCount = Integer.parseInt(nodeData);
+                        int removeCount = changeCountMap.get(path);
+                        if (previousCount >= removeCount) {
+                            curatorClient.setData().withVersion(-1).forPath(path,
+                                    String.valueOf(previousCount - removeCount).getBytes());
+                        } else {
+                            // This is not possible, do we need to reset the job count to 0 ?
+                            logger.error("Requested remove job count is " + removeCount +
+                                    " which is higher than the existing job count " + previousCount
+                                    + " in  " + path + " path.");
+                        }
+                    }
+                }
+                changeZNodePaths.append(path).append(":");
+            }
+
+            // update stat node to trigger orchestrator watchers
+            if (changeCountMap.size() > 0) {
+                changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
+                curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
+            }
+        } catch (Exception e) {
+            logger.error("Error while writing job count to zookeeper", e);
+        }
+
+    }
+
+    /**
+     * Increase job count by one and update the zookeeper
+     * @param monitorID - Job monitorId
+     */
+    public static void increaseZkJobCount(MonitorID monitorID) {
+        Map<String, Integer> addMap = new HashMap<String, Integer>();
+        addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
+        updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
+    }
+
+    /**
+     * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job
+     * @param monitorID - Job monitorId
+     * @return
+     */
+    public static String getJobCountUpdatePath(MonitorID monitorID){
+        return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
+                .append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString();
+    }
+
+    /**
+     * Check whether znode is exist in given path if not create a new znode
+     * @param curatorClient - zookeeper instance
+     * @param path - path to check znode
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
+        if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
+            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode and check parent exist
+                checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
+            }
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
new file mode 100644
index 0000000..08c3f67
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+    static {
+        // parsing of RSA key fails without this
+        java.security.Security.addProvider(new BouncyCastleProvider());
+    }
+
+
+
+    public static KeyStore keyStoreFromPEM(String proxyFile,
+                                           String keyPassPhrase) throws IOException,
+            CertificateException,
+            NoSuchAlgorithmException,
+            InvalidKeySpecException,
+            KeyStoreException {
+        return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+    }
+
+    public static KeyStore keyStoreFromPEM(String certFile,
+                                           String keyFile,
+                                           String keyPassPhrase) throws IOException,
+                                                                        CertificateException,
+                                                                        NoSuchAlgorithmException,
+                                                                        InvalidKeySpecException,
+                                                                        KeyStoreException {
+        CertificateFactory cf = CertificateFactory.getInstance("X.509");
+        X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
+        //System.out.println(cert.toString());
+
+        // this works for proxy files, too, since it skips over the certificate
+        BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+        String line = null;
+        StringBuilder builder = new StringBuilder();
+        boolean inKey = false;
+        while((line=reader.readLine()) != null) {
+            if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+                inKey = true;
+            }
+            if (inKey) {
+                builder.append(line);
+                builder.append(System.getProperty("line.separator"));
+            }
+            if (line.contains("-----END RSA PRIVATE KEY-----")) {
+                inKey = false;
+            }
+        }
+        String privKeyPEM = builder.toString();
+        //System.out.println(privKeyPEM);
+
+        // using BouncyCastle
+//        PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM));
+//        Object object = pemParser.readObject();
+//
+//        PrivateKey privKey = null;
+//        if(object instanceof KeyPair){
+//            privKey = ((KeyPair)object).getPrivate();
+//        }
+        // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency
+        /*
+        // Base64 decode the data
+        byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+        // PKCS8 decode the encoded RSA private key
+        java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+        KeyFactory kf = KeyFactory.getInstance("RSA");
+        PrivateKey privKey = kf.generatePrivate(keySpec);
+        //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+        */
+        //System.out.println(privKey.toString());
+
+//        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+//        keyStore.load(null,null);
+//
+//        KeyStore.PrivateKeyEntry entry =
+//            new KeyStore.PrivateKeyEntry(privKey,
+//                                         new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+//        KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+//        keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
+
+//        return keyStore;
+        //TODO: Problem with BouncyCastle version used in gsissh 
+        throw new CertificateException("Method not implemented");
+
+    }
+
+
+    public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+                                                             KeyStoreException,
+                                                             CertificateException,
+                                                             NoSuchAlgorithmException, ApplicationSettingsException {
+        return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location"));
+    }
+
+    public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
+                                                                           KeyStoreException,
+                                                                           CertificateException,
+                                                                           NoSuchAlgorithmException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null,null);
+
+        File dir = new File(certDir);
+        for(File file : dir.listFiles()) {
+            if (!file.isFile()) {
+                continue;
+            }
+            if (!file.getName().endsWith(".0")) {
+                continue;
+            }
+
+            try {
+                //System.out.println("reading file "+file.getName());
+                CertificateFactory cf = CertificateFactory.getInstance("X.509");
+                X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
+                //System.out.println(cert.toString());
+
+                KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
+
+                ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
+            } catch (KeyStoreException e) {
+            } catch (CertificateParsingException e) {
+                continue;
+            }
+
+        }
+
+        return ks;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
new file mode 100644
index 0000000..74642dc
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/context/SSHAuthWrapper.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.context;
+
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+
+public class SSHAuthWrapper {
+    private ServerInfo serverInfo;
+
+    private AuthenticationInfo authenticationInfo;
+
+    private String key;
+
+    public SSHAuthWrapper(ServerInfo serverInfo, AuthenticationInfo authenticationInfo, String key) {
+        this.serverInfo = serverInfo;
+        this.authenticationInfo = authenticationInfo;
+        this.key = key;
+    }
+
+    public ServerInfo getServerInfo() {
+        return serverInfo;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public String getKey() {
+        return key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
new file mode 100644
index 0000000..9481188
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy input data from gateway machine to airavata
+ * installed machine, later running handlers can copy the input files to computing resource
+ * <Handler class="AdvancedSCPOutputHandler">
+ * <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+ * <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+ * <property name="userName" value="airavata"/>
+ * <property name="hostName" value="gw98.iu.xsede.org"/>
+ * <property name="inputPath" value="/home/airavata/outputData"/>
+ */
+public class AdvancedSCPInputHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
+    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
+    public static final int DEFAULT_SSH_PORT = 22;
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String inputPath;
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+        password = (String) properties.get("password");
+        passPhrase = (String) properties.get("passPhrase");
+        privateKeyPath = (String) properties.get("privateKeyPath");
+        publicKeyPath = (String) properties.get("publicKeyPath");
+        userName = (String) properties.get("userName");
+        hostName = (String) properties.get("hostName");
+        inputPath = (String) properties.get("inputPath");
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
+        MessageContext inputNew = new MessageContext();
+        StringBuffer data = new StringBuffer("|");
+        Cluster pbsCluster = null;
+
+        try {
+            String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
+            if (pluginData != null) {
+                try {
+                    oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim());
+                    oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(","));
+                    if (oldIndex == oldFiles.size()) {
+                        log.info("Old data looks good !!!!");
+                    } else {
+                        oldIndex = 0;
+                        oldFiles.clear();
+                    }
+                } catch (NumberFormatException e) {
+                    log.error("Previously stored data " + pluginData + " is wrong so we continue the operations");
+                }
+            }
+
+            AuthenticationInfo authenticationInfo = null;
+            if (password != null) {
+                authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+            } else {
+                authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                        this.passPhrase);
+            }
+
+            // Server info
+            String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID();
+            if (index < oldIndex) {
+                parentPath = oldFiles.get(index);
+                data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+            } else {
+                (new File(parentPath)).mkdirs();
+                StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString());
+                GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+            }
+            DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            // here doesn't matter what the job manager is because we are only doing some file handling
+            // not really dealing with monitoring or job submission, so we pa
+
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
+                String paramValue = inputParamType.getValue();
+                // TODO: Review this with type
+                if (inputParamType.getType() == DataType.URI) {
+                    try {
+                        URL file = new URL(paramValue);
+                        String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT;
+                        GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT);
+                        pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+                        paramValue = file.getPath();
+                    } catch (MalformedURLException e) {
+                        String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
+                        GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
+                        pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+                        log.error(e.getLocalizedMessage(), e);
+                    }
+
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                        inputParamType.setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+                    } else {
+                        String stageInputFile = stageInputFiles(pbsCluster, paramValue, parentPath);
+                        inputParamType.setValue(stageInputFile);
+                        StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+                        GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                    }
+                }
+                // FIXME: what is the thrift model DataType equivalent for URIArray type?
+//                else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+//                    List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+//                    List<String> newFiles = new ArrayList<String>();
+//                    for (String paramValueEach : split) {
+//                        try {
+//                            URL file = new URL(paramValue);
+//                            this.userName = file.getUserInfo();
+//                            this.hostName = file.getHost();
+//                            paramValueEach = file.getPath();
+//                        } catch (MalformedURLException e) {
+//                            log.error(e.getLocalizedMessage(), e);
+//                        }
+//                        if (index < oldIndex) {
+//                            log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+//                            newFiles.add(oldFiles.get(index));
+//                            data.append(oldFiles.get(index++)).append(",");
+//                        } else {
+//                            String stageInputFiles = stageInputFiles(pbsCluster, paramValueEach, parentPath);
+//                            StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+//                            GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+//                            newFiles.add(stageInputFiles);
+//                        }
+//                    }
+//                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+//                }
+                inputNew.getParameters().put(paramName, inputParamType);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (GFacException e1) {
+                log.error(e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        this.invoke(jobExecutionContext);
+    }
+
+    private String stageInputFiles(Cluster cluster, String paramValue, String parentPath) throws GFacException {
+        try {
+            cluster.scpFrom(paramValue, parentPath);
+            return "file://" + parentPath + File.separator + (new File(paramValue)).getName();
+        } catch (SSHApiException e) {
+            log.error("Error tranfering remote file to local file, remote path: " + paramValue);
+            throw new GFacException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
new file mode 100644
index 0000000..320f236
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * This handler will copy outputs from airavata installed local directory
+ * to a remote location, prior to this handler SCPOutputHandler should be invoked
+ * Should add following configuration to gfac-config.xml and configure the keys properly
+ * <Handler class="AdvancedSCPOutputHandler">
+                            <property name="privateKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa"/>
+                            <property name="publicKeyPath" value="/Users/lahirugunathilake/.ssh/id_dsa.pub"/>
+                        <property name="userName" value="airavata"/>
+                        <property name="hostName" value="gw98.iu.xsede.org"/>
+                        <property name="outputPath" value="/home/airavata/outputData"/>
+                        <property name="passPhrase" value="/home/airavata/outputData"/>
+                        <property name="password" value="/home/airavata/outputData"/>
+
+ */
+public class AdvancedSCPOutputHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
+
+    public static final int DEFAULT_SSH_PORT = 22;
+
+    private String password = null;
+
+    private String publicKeyPath;
+
+    private String passPhrase;
+
+    private String privateKeyPath;
+
+    private String userName;
+
+    private String hostName;
+
+    private String outputPath;
+
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+        password = (String)properties.get("password");
+        passPhrase = (String)properties.get("passPhrase");
+        privateKeyPath = (String)properties.get("privateKeyPath");
+        publicKeyPath = (String)properties.get("publicKeyPath");
+        userName = (String)properties.get("userName");
+        hostName = (String)properties.get("hostName");
+        outputPath = (String)properties.get("outputPath");
+    }
+
+    @Override
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+    	Cluster pbsCluster = null;
+        AuthenticationInfo authenticationInfo = null;
+        if (password != null) {
+            authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password);
+        } else {
+            authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath,
+                    this.passPhrase);
+        }
+        try {
+            String hostName = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                try {
+                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    try {
+                        StringWriter errors = new StringWriter();
+                        e.printStackTrace(new PrintWriter(errors));
+         				GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+         			} catch (GFacException e1) {
+         				 log.error(e1.getLocalizedMessage());
+         			}
+                    throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+            String standardError = jobExecutionContext.getStandardError();
+            String standardOutput = jobExecutionContext.getStandardOutput();
+            super.invoke(jobExecutionContext);
+            // Server info
+            if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){
+                try{
+                    URL outputPathURL = new URL(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir());
+                    this.userName = outputPathURL.getUserInfo();
+                    this.hostName = outputPathURL.getHost();
+                    outputPath = outputPathURL.getPath();
+                } catch (MalformedURLException e) {
+                    log.error(e.getLocalizedMessage(),e);
+                }
+            }
+            String key = GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT);
+            pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getPbsCluster();
+            if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
+            outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID()
+                    + File.separator;
+                pbsCluster.makeDirectory(outputPath);
+            }
+            pbsCluster.scpTo(outputPath, standardError);
+            pbsCluster.scpTo(outputPath, standardOutput);
+            List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+            Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            Set<String> keys = output.keySet();
+            for (String paramName : keys) {
+                OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName);
+                if (outputDataObjectType.getType() == DataType.URI) {
+                    // for failed jobs outputs are not generated. So we should not download outputs
+                    if (GFacUtils.isFailedJob(jobExecutionContext)){
+                        continue;
+                    }
+                	String downloadFile = outputDataObjectType.getValue();
+                    if(downloadFile == null || !(new File(downloadFile).isFile())){
+                        GFacUtils.saveErrorDetails(jobExecutionContext, "Empty Output returned from the application", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                		throw new GFacHandlerException("Empty Output returned from the application.." );
+                	}
+                	pbsCluster.scpTo(outputPath, downloadFile);
+                    String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length());
+                    OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.URI);
+                    dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }else if (outputDataObjectType.getType() == DataType.STDOUT) {
+                    pbsCluster.scpTo(outputPath, standardOutput);
+                    String fileName = standardOutput.substring(standardOutput.lastIndexOf(File.separatorChar)+1, standardOutput.length());
+                    OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.STDOUT);
+                    dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }else if (outputDataObjectType.getType() == DataType.STDERR) {
+                    pbsCluster.scpTo(outputPath, standardError);
+                    String fileName = standardError.substring(standardError.lastIndexOf(File.separatorChar)+1, standardError.length());
+                    OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                    dataObjectType.setValue(outputPath + File.separatorChar + fileName);
+                    dataObjectType.setName(paramName);
+                    dataObjectType.setType(DataType.STDERR);
+                    dataObjectType.setIsRequired(outputDataObjectType.isIsRequired());
+                    dataObjectType.setRequiredToAddedToCommandLine(outputDataObjectType.isRequiredToAddedToCommandLine());
+                    dataObjectType.setApplicationArgument(outputDataObjectType.getApplicationArgument());
+                    dataObjectType.setSearchQuery(outputDataObjectType.getSearchQuery());
+                    outputArray.add(dataObjectType);
+                }
+             }
+           registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+        } catch (SSHApiException e) {
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+				GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+			} catch (GFacException e1) {
+				 log.error(e1.getLocalizedMessage());
+			}
+            log.error("Error transfering files to remote host : " + hostName + " with the user: " + userName);
+            log.error(e.getMessage());
+            throw new GFacHandlerException(e);
+        } catch (Exception e) {
+        	 try {
+ 				GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ 			} catch (GFacException e1) {
+ 				 log.error(e1.getLocalizedMessage());
+ 			}
+        	throw new GFacHandlerException(e);
+        }
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
new file mode 100644
index 0000000..61a1805
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
@@ -0,0 +1,78 @@
+package org.apache.airavata.gfac.ssh.handler;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.util.HandleOutputs;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NewSSHOutputHandler extends AbstractHandler{
+
+	 private static final Logger log = LoggerFactory.getLogger(NewSSHOutputHandler.class);
+
+	    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+	        String hostAddress = jobExecutionContext.getHostName();
+	      	Cluster cluster = null;
+	      	// Security Context and connection
+	        try {
+	            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+	                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+	            }
+	            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+	            if (cluster == null) {
+	                throw new GFacProviderException("Security context is not set properly");
+	            } else {
+	                log.info("Successfully retrieved the Security Context");
+	            }
+	        } catch (Exception e) {
+	            log.error(e.getMessage());
+	            try {
+                    StringWriter errors = new StringWriter();
+                    e.printStackTrace(new PrintWriter(errors));
+	                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+	            } catch (GFacException e1) {
+	                log.error(e1.getLocalizedMessage());
+	            }
+	            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+	        }
+
+	        super.invoke(jobExecutionContext);
+	        List<OutputDataObjectType> outputArray =  HandleOutputs.handleOutputs(jobExecutionContext, cluster);
+	        try {
+				registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+			} catch (RegistryException e) {
+				throw new GFacHandlerException(e);
+			}
+
+	       
+	    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    @Override
+	public void initProperties(Properties properties) throws GFacHandlerException {
+		// TODO Auto-generated method stub
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
new file mode 100644
index 0000000..fb86dd3
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Properties;
+
+public class SSHDirectorySetupHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(SSHDirectorySetupHandler.class);
+
+	public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+ 				GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ 			} catch (GFacException e1) {
+ 				 log.error(e1.getLocalizedMessage());
+ 			}
+            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+        } 
+
+        log.info("Setup SSH job directorties");
+        super.invoke(jobExecutionContext);
+        makeDirectory(jobExecutionContext);
+
+	}
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+		Cluster cluster = null;
+		try{
+            String hostAddress = jobExecutionContext.getHostName();
+            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+        if (cluster == null) {
+            throw new GFacHandlerException("Security context is not set properly");
+        } else {
+            log.info("Successfully retrieved the Security Context");
+        }
+            String workingDirectory = jobExecutionContext.getWorkingDir();
+            cluster.makeDirectory(workingDirectory);
+            if(!jobExecutionContext.getInputDir().equals(workingDirectory))
+            	cluster.makeDirectory(jobExecutionContext.getInputDir());
+            if(!jobExecutionContext.getOutputDir().equals(workingDirectory))
+            	cluster.makeDirectory(jobExecutionContext.getOutputDir());
+            
+            DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            status.setTransferState(TransferState.DIRECTORY_SETUP);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("Working directory = " + workingDirectory);
+
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+        } catch (Exception e) {
+			DataTransferDetails detail = new DataTransferDetails();
+            TransferStatus status = new TransferStatus();
+            status.setTransferState(TransferState.FAILED);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir());
+            try {
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e);
+        }
+        
+	}
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
new file mode 100644
index 0000000..277ff0e
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHInputHandler extends AbstractHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(SSHInputHandler.class);
+
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        DataTransferDetails detail = new DataTransferDetails();
+        detail.setTransferDescription("Input Data Staging");
+        TransferStatus status = new TransferStatus();
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
+        StringBuffer data = new StringBuffer("|");
+        MessageContext inputNew = new MessageContext();
+        Cluster cluster = null;
+        
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                try {
+                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    try {
+                        StringWriter errors = new StringWriter();
+                        e.printStackTrace(new PrintWriter(errors));
+         				GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+         			} catch (GFacException e1) {
+         				 log.error(e1.getLocalizedMessage());
+         			}
+                    throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+
+            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            log.info("Invoking SCPInputHandler");
+            super.invoke(jobExecutionContext);
+
+
+            MessageContext input = jobExecutionContext.getInMessageContext();
+            Set<String> parameters = input.getParameters().keySet();
+            for (String paramName : parameters) {
+                InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName);
+                String paramValue = inputParamType.getValue();
+                //TODO: Review this with type
+                if (inputParamType.getType() == DataType.URI) {
+                    if (index < oldIndex) {
+                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+                        inputParamType.setValue(oldFiles.get(index));
+                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+                    } else {
+                        String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue);
+                        inputParamType.setValue(stageInputFile);
+                        StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString());
+                        status.setTransferState(TransferState.UPLOAD);
+                        detail.setTransferStatus(status);
+                        detail.setTransferDescription("Input Data Staged: " + stageInputFile);
+                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+                        GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+                    }
+                }// FIXME: what is the thrift model DataType equivalent for URIArray type?
+//                else if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+//                	if (index < oldIndex) {
+//                        log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!");
+//                        ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index));
+//                        data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index
+//                    }else{
+//                	List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue));
+//                    List<String> newFiles = new ArrayList<String>();
+//                    for (String paramValueEach : split) {
+//                        String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach);
+//                        status.setTransferState(TransferState.UPLOAD);
+//                        detail.setTransferStatus(status);
+//                        detail.setTransferDescription("Input Data Staged: " + stageInputFiles);
+//                        registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+//                        newFiles.add(stageInputFiles);
+//                        StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString());
+//                        GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName());
+//                    }
+//                    ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()]));
+//                    }
+//                }
+                inputNew.getParameters().put(paramName, inputParamType);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            status.setTransferState(TransferState.FAILED);
+            detail.setTransferStatus(status);
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+        }
+        jobExecutionContext.setInMessageContext(inputNew);
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
+        int i = paramValue.lastIndexOf(File.separator);
+        String substring = paramValue.substring(i + 1);
+        try {
+            String targetFile = jobExecutionContext.getInputDir() + File.separator + substring;
+            if(paramValue.startsWith("scp:")){
+            	paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+            	cluster.scpThirdParty(paramValue, targetFile);
+            }else{
+            if(paramValue.startsWith("file")){
+                paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length());
+            }
+            boolean success = false;
+            int j = 1;
+            while(!success){
+            try {
+				cluster.scpTo(targetFile, paramValue);
+				success = true;
+			} catch (Exception e) {
+				log.info(e.getLocalizedMessage());
+				Thread.sleep(2000);
+				 if(j==3) {
+					throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+				 }
+            }
+            j++;
+            }
+            }
+            return targetFile;
+        } catch (Exception e) {
+            throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage());
+        }
+    }
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
new file mode 100644
index 0000000..7c5538a
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.ssh.handler;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.impl.OutputUtils;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TransferState;
+import org.apache.airavata.model.workspace.experiment.TransferStatus;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class SSHOutputHandler extends AbstractHandler {
+    private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        String hostAddress = jobExecutionContext.getHostName();
+        try {
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            try {
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (GFacException e1) {
+                log.error(e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+        }
+
+        super.invoke(jobExecutionContext);
+        DataTransferDetails detail = new DataTransferDetails();
+        detail.setTransferDescription("Output data staging");
+        TransferStatus status = new TransferStatus();
+
+        Cluster cluster = null;
+        try {
+             cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+
+            // Get the Stdouts and StdErrs
+            String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
+
+            TaskDetails taskData = jobExecutionContext.getTaskData();
+            String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
+            File localStdOutFile;
+            File localStdErrFile;
+            //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work 
+//            if (taskData.getAdvancedOutputDataHandling() != null) {
+//                outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+//            }
+            if (outputDataDir == null) {
+                outputDataDir = File.separator + "tmp";
+            }
+            outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID();
+            (new File(outputDataDir)).mkdirs();
+
+
+            localStdOutFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stdout");
+            localStdErrFile = new File(outputDataDir + File.separator + timeStampedExperimentID + "stderr");
+//            cluster.makeDirectory(outputDataDir);
+            int i = 0;
+            String stdOutStr = "";
+            while (stdOutStr.isEmpty()) {
+                try {
+                    cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath());
+                    stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+                } catch (Exception e) {
+                    log.error(e.getLocalizedMessage());
+                    Thread.sleep(2000);
+                }
+                i++;
+                if (i == 3) break;
+            }
+            Thread.sleep(1000);
+            cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath());
+            Thread.sleep(1000);
+
+            String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+            status.setTransferState(TransferState.STDOUT_DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+            status.setTransferState(TransferState.STDERROR_DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath());
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+
+            List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+            Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            Set<String> keys = output.keySet();
+            for (String paramName : keys) {
+                OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(paramName);
+                if (DataType.URI == actualParameter.getType()) {
+                    List<String> outputList = null;
+                    int retry = 3;
+                    while (retry > 0) {
+                        outputList = cluster.listDirectory(jobExecutionContext.getOutputDir());
+                        if (outputList.size() > 0) {
+                            break;
+                        }
+                        retry--;
+                        Thread.sleep(2000);
+                    }
+
+                    if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) {
+                        OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+                        Set<String> strings = output.keySet();
+                        outputArray.clear();
+                        for (String key : strings) {
+                            OutputDataObjectType actualParameter1 = (OutputDataObjectType) output.get(key);
+                            if (DataType.URI == actualParameter1.getType()) {
+                                String downloadFile = actualParameter1.getValue();
+                                cluster.scpFrom(downloadFile, outputDataDir);
+                                String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length());
+                                String localFile = outputDataDir + File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter1.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.URI);
+                                outputArray.add(dataObjectType);
+                            }else if (DataType.STDOUT == actualParameter.getType()) {
+                                String fileName = localStdOutFile.getName();
+                                String localFile = outputDataDir + File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.STDOUT);
+                                outputArray.add(dataObjectType);
+                            }else if (DataType.STDERR == actualParameter.getType()) {
+                                String fileName = localStdErrFile.getName();
+                                String localFile = outputDataDir + File.separator + fileName;
+                                jobExecutionContext.addOutputFile(localFile);
+                                actualParameter.setValue(localFile);
+                                OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                                dataObjectType.setValue(localFile);
+                                dataObjectType.setName(key);
+                                dataObjectType.setType(DataType.STDERR);
+                                outputArray.add(dataObjectType);
+                            }
+                        }
+                        break;
+                    } else if (outputList.size() == 1) {//FIXME: Ultrascan case
+                        String valueList = outputList.get(0);
+                        cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir);
+                        String outputPath = outputDataDir + File.separator + valueList;
+                        jobExecutionContext.addOutputFile(outputPath);
+                        actualParameter.setValue(outputPath);
+                        OutputDataObjectType dataObjectType = new OutputDataObjectType();
+                        dataObjectType.setValue(outputPath);
+                        dataObjectType.setName(paramName);
+                        dataObjectType.setType(DataType.URI);
+                        outputArray.add(dataObjectType);
+                    }
+                } else {
+                    OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+                }
+            }
+            if (outputArray == null || outputArray.isEmpty()) {
+                log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names");
+                if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) {
+                    throw new GFacHandlerException(
+                            "Empty Output returned from the Application, Double check the application"
+                                    + "and ApplicationDescriptor output Parameter Names");
+                }
+            }
+            jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath());
+            jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath());
+            jobExecutionContext.setOutputDir(outputDataDir);
+            status.setTransferState(TransferState.DOWNLOAD);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription(outputDataDir);
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+            registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+
+        } catch (Exception e) {
+            try {
+                status.setTransferState(TransferState.FAILED);
+                detail.setTransferStatus(status);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE);
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error in retrieving results", e);
+        }
+
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        // TODO: Auto generated method body.
+    }
+
+    public void initProperties(Properties properties) throws GFacHandlerException {
+
+    }
+}