You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:15:37 UTC

[24/81] [abbrv] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..9f369b1
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.SSHApiException;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Map;
+
+//import org.apache.airavata.schemas.gfac.GsisshHostType;
+
+public class GSISSHProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        super.initialize(jobExecutionContext);
+        try {
+            String hostAddress = jobExecutionContext.getHostName();
+            if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
+                GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+            }
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage());
+            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+        } catch (GFacException e) {
+            throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+        }
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        log.info("Invoking GSISSH Provider Invoke ...");
+        StringBuffer data = new StringBuffer();
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+        ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
+        ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription();
+        JobDetails jobDetails = new JobDetails();
+        Cluster cluster = null;
+
+        try {
+            if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) {
+                cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName())).getPbsCluster();
+            }
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            // This installed path is a mandetory field, because this could change based on the computing resource
+            JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster);
+            jobDetails.setJobName(jobDescriptor.getJobName());
+
+            log.info(jobDescriptor.toXML());
+            data.append("jobDesc=").append(jobDescriptor.toXML());
+            jobDetails.setJobDescription(jobDescriptor.toXML());
+            String jobID = cluster.submitBatchJob(jobDescriptor);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if (jobID == null) {
+                jobDetails.setJobID("none");
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            } else {
+                jobDetails.setJobID(jobID.split("\\.")[0]);
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+            }
+            data.append(",jobId=").append(jobDetails.getJobID());
+
+            // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
+            // to perform monitoring, daemon handlers can be accessed from anywhere
+            monitor(jobExecutionContext);
+            // we know this host is type GsiSSHHostType
+        } catch (Exception e) {
+		    String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            StringWriter errors = new StringWriter();
+            e.printStackTrace(new PrintWriter(errors));
+            GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        } finally {
+            log.info("Saving data for future recovery: ");
+            log.info(data.toString());
+            GFacUtils.saveHandlerData(jobExecutionContext, data, this.getClass().getName());
+        } 
+          
+    }
+
+    public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException {
+/*        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+                    jobExecutionContext.setProperty("cancel","true");
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }*/
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+        //To change body of implemented methods use File | Settings | File Templates.
+        log.info("canceling the job status in GSISSHProvider!!!!!");
+        JobDetails jobDetails = jobExecutionContext.getJobDetails();
+        String hostName = jobExecutionContext.getHostName();
+        try {
+            Cluster cluster = null;
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+            }
+            cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostName)).getPbsCluster();
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            // This installed path is a mandetory field, because this could change based on the computing resource
+            if(jobDetails == null) {
+                log.error("There is not JobDetails so cancelations cannot perform !!!");
+                return false;
+            }
+            if (jobDetails.getJobID() != null) {
+                // if this operation success without any exceptions, we can assume cancel operation succeeded.
+                cluster.cancelJob(jobDetails.getJobID());
+            } else {
+                log.error("No Job Id is set, so cannot perform the cancel operation !!!");
+                return false;
+            }
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
+            return true;
+            // we know this host is type GsiSSHHostType
+        } catch (SSHApiException e) {
+            String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        } catch (Exception e) {
+            String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        }
+    }
+
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+        // have to implement the logic to recover a gfac failure
+        log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
+        ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext()
+                .getComputeResourceDescription();
+        String hostName = jobExecutionContext.getHostName();
+        String jobId = "";
+        String jobDesc = "";
+        try {
+            String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName());
+            String[] split = pluginData.split(",");
+            if (split.length < 2) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    log.error("Error while  recovering provider", e);
+                    throw new GFacProviderException("Error recovering provider", e);
+                }
+                return;
+            }
+            jobDesc = split[0].substring(7);
+            jobId = split[1].substring(6);
+
+            log.info("Following data have recovered: ");
+            log.info("Job Description: " + jobDesc);
+            log.info("Job Id: " + jobId);
+            if (jobId == null || "none".equals(jobId) ||
+                    "".equals(jobId)) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    log.error("Error while  recovering provider", e);
+                    throw new GFacProviderException("Error recovering provider", e);
+                }
+                return;
+            }
+        } catch (Exception e) {
+            log.error("Error while  recovering provider", e);
+        }
+        try {
+            // Now we are we have enough data to recover
+            JobDetails jobDetails = new JobDetails();
+            jobDetails.setJobDescription(jobDesc);
+            jobDetails.setJobID(jobId);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if (jobExecutionContext.getSecurityContext(hostName) == null) {
+                try {
+                    GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+            monitor(jobExecutionContext);
+        } catch (Exception e) {
+            log.error("Error while recover the job", e);
+            throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
+        }
+    }
+
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        String jobSubmissionInterfaceId = jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId();
+        SSHJobSubmission sshJobSubmission = null;
+        try {
+            sshJobSubmission = jobExecutionContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+        } catch (AppCatalogException e) {
+            throw new GFacException("Error while reading compute resource", e);
+        }
+        if (jobExecutionContext.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            if (monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) {
+                try {
+                    EmailBasedMonitor emailBasedMonitor = EmailMonitorFactory.getEmailBasedMonitor(
+                            sshJobSubmission.getResourceJobManager().getResourceJobManagerType());
+                    emailBasedMonitor.addToJobMonitorMap(jobExecutionContext);
+                } catch (AiravataException e) {
+                    throw new GFacHandlerException("Error while activating email job monitoring ", e);
+                }
+                return;
+            }
+        }
+/*
+        // if email monitor is not activeated or not configure we use pull or push monitor
+        List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+        String jobID = jobExecutionContext.getJobDetails().getJobID();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) {
+                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) {
+                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+
+        }*/
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
new file mode 100644
index 0000000..85e9e29
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/GSISecurityContext.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.AbstractSecurityContext;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles GRID related security.
+ */
+public class GSISecurityContext extends AbstractSecurityContext {
+
+    protected static final Logger log = LoggerFactory.getLogger(GSISecurityContext.class);
+    /*
+     * context name
+     */
+
+    private Cluster pbsCluster = null;
+
+
+    public GSISecurityContext(CredentialReader credentialReader, RequestData requestData, Cluster pbsCluster) {
+        super(credentialReader, requestData);
+        this.pbsCluster = pbsCluster;
+    }
+
+
+    public GSISecurityContext(CredentialReader credentialReader, RequestData requestData) {
+        super(credentialReader, requestData);
+    }
+
+
+    public GSISecurityContext(Cluster pbsCluster) {
+        this.setPbsCluster(pbsCluster);
+    }
+
+
+
+    public Cluster getPbsCluster() {
+        return pbsCluster;
+    }
+
+    public void setPbsCluster(Cluster pbsCluster) {
+        this.pbsCluster = pbsCluster;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
new file mode 100644
index 0000000..a3e0241
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/security/TokenizedMyProxyAuthInfo.java
@@ -0,0 +1,304 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.security;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
+import org.globus.gsi.X509Credential;
+import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
+import org.globus.gsi.provider.GlobusProvider;
+import org.globus.myproxy.GetParams;
+import org.globus.myproxy.MyProxy;
+import org.globus.myproxy.MyProxyException;
+import org.gridforum.jgss.ExtendedGSSCredential;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+
+public class TokenizedMyProxyAuthInfo extends GSIAuthenticationInfo {
+    protected static final Logger log = LoggerFactory.getLogger(TokenizedMyProxyAuthInfo.class);
+
+    public static int CREDENTIAL_RENEWING_THRESH_HOLD = 10 * 90;
+
+    private GSSCredential gssCredentials = null;
+
+
+    private CredentialReader credentialReader;
+
+    private RequestData requestData;
+
+    public static final String X509_CERT_DIR = "X509_CERT_DIR";
+
+
+    static {
+        Security.addProvider(new GlobusProvider());
+        try {
+            setUpTrustedCertificatePath();
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getLocalizedMessage(), e);
+        }
+    }
+
+    public static void setUpTrustedCertificatePath(String trustedCertificatePath) {
+
+        File file = new File(trustedCertificatePath);
+
+        if (!file.exists() || !file.canRead()) {
+            File f = new File(".");
+            log.info("Current directory " + f.getAbsolutePath());
+            throw new RuntimeException("Cannot read trusted certificate path " + trustedCertificatePath);
+        } else {
+            System.setProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY, file.getAbsolutePath());
+        }
+    }
+
+    private static void setUpTrustedCertificatePath() throws ApplicationSettingsException {
+
+        String trustedCertificatePath = ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION);
+
+        setUpTrustedCertificatePath(trustedCertificatePath);
+    }
+
+    public TokenizedMyProxyAuthInfo(CredentialReader credentialReader, RequestData requestData) {
+        this.credentialReader = credentialReader;
+        this.requestData = requestData;
+        try {
+            properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+        } catch (ApplicationSettingsException e) {
+            log.error("Error while  reading server properties", e);
+        };
+    }
+
+    public TokenizedMyProxyAuthInfo(RequestData requestData) {
+           this.requestData = requestData;
+           try {
+               properties.setProperty(X509_CERT_DIR, ServerSettings.getSetting(Constants.TRUSTED_CERT_LOCATION));
+           } catch (ApplicationSettingsException e) {
+               log.error("Error while  reading server properties", e);
+           };
+       }
+
+    public GSSCredential getCredentials() throws SecurityException {
+
+        if (gssCredentials == null) {
+
+            try {
+                gssCredentials = getCredentialsFromStore();
+            } catch (Exception e) {
+                log.error("An exception occurred while retrieving credentials from the credential store. " +
+                        "Will continue with my proxy user name and password. Provided TokenId:" + requestData.getTokenId(), e);
+            }
+
+            if (gssCredentials == null) {
+                System.out.println("Authenticating with provided token failed, so falling back to authenticate with defaultCredentials");
+                try {
+                    gssCredentials = getDefaultCredentials();
+                } catch (Exception e) {
+                    throw new SecurityException("Error retrieving my proxy using username password");
+                }
+            }
+            // if still null, throw an exception
+            if (gssCredentials == null) {
+                throw new SecurityException("Unable to retrieve my proxy credentials to continue operation.");
+            }
+        } else {
+            try {
+                if (gssCredentials.getRemainingLifetime() < CREDENTIAL_RENEWING_THRESH_HOLD) {
+                    try {
+                        return renewCredentials();
+                    } catch (Exception e) {
+                        throw new SecurityException("Error renewing credentials", e);
+                    }
+                }
+            } catch (GSSException e) {
+                throw new SecurityException("Unable to retrieve remaining life time from credentials.", e);
+            }
+        }
+
+        return gssCredentials;
+    }
+
+
+    /**
+     * Reads the credentials from credential store.
+     *
+     * @return If token is found in the credential store, will return a valid credential. Else returns null.
+     * @throws Exception If an error occurred while retrieving credentials.
+     */
+    public GSSCredential getCredentialsFromStore() throws Exception {
+
+        if (getCredentialReader() == null) {
+        	credentialReader = GFacUtils.getCredentialReader();
+        	if(credentialReader == null){
+        		return null;
+        	}
+        }
+
+        Credential credential = getCredentialReader().getCredential(getRequestData().getGatewayId(),
+                getRequestData().getTokenId());
+
+        if (credential != null) {
+            if (credential instanceof CertificateCredential) {
+
+                log.info("Successfully found credentials for token id - " + getRequestData().getTokenId() +
+                        " gateway id - " + getRequestData().getGatewayId());
+
+                CertificateCredential certificateCredential = (CertificateCredential) credential;
+
+                X509Certificate[] certificates = certificateCredential.getCertificates();
+                X509Credential newCredential = new X509Credential(certificateCredential.getPrivateKey(), certificates);
+
+                GlobusGSSCredentialImpl cred = new GlobusGSSCredentialImpl(newCredential, GSSCredential.INITIATE_AND_ACCEPT);
+                System.out.print(cred.export(ExtendedGSSCredential.IMPEXP_OPAQUE));
+                return cred;
+                //return new GlobusGSSCredentialImpl(newCredential,
+                //        GSSCredential.INITIATE_AND_ACCEPT);
+            } else {
+                log.info("Credential type is not CertificateCredential. Cannot create mapping globus credentials. " +
+                        "Credential type - " + credential.getClass().getName());
+            }
+        } else {
+            log.info("Could not find credentials for token - " + getRequestData().getTokenId() + " and "
+                    + "gateway id - " + getRequestData().getGatewayId());
+        }
+
+        return null;
+    }
+
+    /**
+     * Renew GSSCredentials.
+     * Before executing we need to add current host as a trusted renewer. Note to renew credentials
+     * we dont need user name and password.
+     * To do that execute following command
+     * > myproxy-logon -t <LIFETIME></LIFETIME> -s <MY PROXY SERVER> -l <USER NAME>
+     * E.g :- > myproxy-logon -t 264 -s myproxy.teragrid.org -l us3
+     * Enter MyProxy pass phrase:
+     * A credential has been received for user us3 in /tmp/x509up_u501.
+     * > myproxy-init -A --cert /tmp/x509up_u501 --key /tmp/x509up_u501 -l ogce -s myproxy.teragrid.org
+     *
+     * @return Renewed credentials.
+     * @throws org.apache.airavata.gfac.GFacException                            If an error occurred while renewing credentials.
+     * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential renewCredentialsAsATrustedHost() throws GFacException, ApplicationSettingsException {
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+        GetParams getParams = new GetParams();
+        getParams.setAuthzCreds(gssCredentials);
+        getParams.setUserName(getRequestData().getMyProxyUserName());
+        getParams.setLifetime(getRequestData().getMyProxyLifeTime());
+        try {
+            return myproxy.get(gssCredentials, getParams);
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while renewing security credentials.", e);
+        }
+    }
+
+
+    /**
+     * Gets the default proxy certificate.
+     *
+     * @return Default my proxy credentials.
+     * @throws org.apache.airavata.gfac.GFacException                            If an error occurred while retrieving credentials.
+     * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException {
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+        try {
+            return myproxy.get(getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+                    getRequestData().getMyProxyLifeTime());
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while retrieving default security credentials.", e);
+        }
+    }
+
+
+    /**
+     * Renews credentials. First try to renew credentials as a trusted renewer. If that failed
+     * use user name and password to renew credentials.
+     *
+     * @return Renewed credentials.
+     * @throws org.apache.airavata.gfac.GFacException                            If an error occurred while renewing credentials.
+     * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential renewCredentials() throws GFacException, ApplicationSettingsException {
+
+        // First try to renew credentials as a trusted renewer
+        try {
+            gssCredentials = renewCredentialsAsATrustedHost();
+        } catch (Exception e) {
+            log.warn("Renewing credentials as a trusted renewer failed", e);
+            gssCredentials = getDefaultCredentials();
+        }
+
+        return gssCredentials;
+    }
+
+    /**
+     * Gets a new proxy certificate given current credentials.
+     *
+     * @return The short lived GSSCredentials
+     * @throws org.apache.airavata.gfac.GFacException                            If an error is occurred while retrieving credentials.
+     * @throws org.apache.airavata.common.exception.ApplicationSettingsException
+     */
+    public GSSCredential getProxyCredentials() throws GFacException, ApplicationSettingsException {
+
+        MyProxy myproxy = new MyProxy(getRequestData().getMyProxyServerUrl(), getRequestData().getMyProxyPort());
+        try {
+            return myproxy.get(gssCredentials, getRequestData().getMyProxyUserName(), getRequestData().getMyProxyPassword(),
+                    getRequestData().getMyProxyLifeTime());
+        } catch (MyProxyException e) {
+            throw new GFacException("An error occurred while renewing security credentials using user/password.", e);
+        }
+    }
+
+    public void setGssCredentials(GSSCredential gssCredentials) {
+        this.gssCredentials = gssCredentials;
+    }
+
+    public CredentialReader getCredentialReader() {
+        return credentialReader;
+    }
+
+    public void setCredentialReader(CredentialReader credentialReader) {
+        this.credentialReader = credentialReader;
+    }
+
+    public RequestData getRequestData() {
+        return requestData;
+    }
+
+    public void setRequestData(RequestData requestData) {
+        this.requestData = requestData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
new file mode 100644
index 0000000..c3978b1
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -0,0 +1,367 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.util;
+
+import org.airavata.appcatalog.cpi.AppCatalog;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.gsissh.security.TokenizedMyProxyAuthInfo;
+import org.apache.airavata.gfac.ssh.api.Cluster;
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gfac.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gfac.ssh.impl.GSISSHAbstractCluster;
+import org.apache.airavata.gfac.ssh.impl.PBSCluster;
+import org.apache.airavata.gfac.ssh.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+
+public class GFACGSISSHUtils {
+    private final static Logger logger = LoggerFactory.getLogger(GFACGSISSHUtils.class);
+
+    public static final String PBS_JOB_MANAGER = "pbs";
+    public static final String SLURM_JOB_MANAGER = "slurm";
+    public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE";
+    public static final String LSF_JOB_MANAGER = "lsf";
+
+    public static int maxClusterCount = 5;
+    public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>();
+
+    public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException {
+        JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
+        JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol();
+        try {
+            AppCatalog appCatalog = jobExecutionContext.getAppCatalog();
+            SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+            if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE
+                    || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) {
+                logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml");
+            } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+                String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework
+                RequestData requestData = new RequestData(jobExecutionContext.getGatewayID());
+                requestData.setTokenId(credentialStoreToken);
+                PBSCluster pbsCluster = null;
+                GSISecurityContext context = null;
+
+                TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData);
+                CredentialReader credentialReader = GFacUtils.getCredentialReader();
+                if (credentialReader != null) {
+                    CertificateCredential credential = null;
+                    try {
+                        credential = (CertificateCredential) credentialReader.getCredential(jobExecutionContext.getGatewayID(), credentialStoreToken);
+                        requestData.setMyProxyUserName(credential.getCommunityUser().getUserName());
+                    } catch (Exception e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
+                }
+
+                String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+
+                        sshJobSubmission.getSshPort();
+                boolean recreate = false;
+                synchronized (clusters) {
+                    if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) {
+                        recreate = true;
+                    } else if (clusters.containsKey(key)) {
+                        int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount;
+                        if (clusters.get(key).get(i).getSession().isConnected()) {
+                            pbsCluster = (PBSCluster) clusters.get(key).get(i);
+                        } else {
+                            clusters.get(key).remove(i);
+                            recreate = true;
+                        }
+                        if (!recreate) {
+                            try {
+                                pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate
+                            } catch (Exception e) {
+                                clusters.get(key).remove(i);
+                                logger.info("Connection found the connection map is expired, so we create from the scratch");
+                                maxClusterCount++;
+                                recreate = true; // we make the pbsCluster to create again if there is any exception druing connection
+                            }
+                            logger.info("Re-using the same connection used with the connection string:" + key);
+                            context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster);
+                        }
+                    } else {
+                        recreate = true;
+                    }
+
+                    if (recreate) {
+                        ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(),
+                                sshJobSubmission.getSshPort());
+
+                        JobManagerConfiguration jConfig = null;
+                        String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath();
+                        String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString();
+                        if (jobManager == null) {
+                            logger.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+                            jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+                        } else {
+                            if (PBS_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getPBSJobManager(installedParentPath);
+                            } else if (SLURM_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getSLURMJobManager(installedParentPath);
+                            } else if (SUN_GRID_ENGINE_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getUGEJobManager(installedParentPath);
+                            }else if(LSF_JOB_MANAGER.equalsIgnoreCase(jobManager)) {
+                                jConfig = CommonUtils.getLSFJobManager(installedParentPath);
+                            }
+                        }
+                        pbsCluster = new PBSCluster(serverInfo, tokenizedMyProxyAuthInfo, jConfig);
+                        context = new GSISecurityContext(tokenizedMyProxyAuthInfo.getCredentialReader(), requestData, pbsCluster);
+                        List<Cluster> pbsClusters = null;
+                        if (!(clusters.containsKey(key))) {
+                            pbsClusters = new ArrayList<Cluster>();
+                        } else {
+                            pbsClusters = clusters.get(key);
+                        }
+                        pbsClusters.add(pbsCluster);
+                        clusters.put(key, pbsClusters);
+                    }
+                }
+
+                jobExecutionContext.addSecurityContext(jobExecutionContext.getHostName(), context);
+            }
+        } catch (Exception e) {
+            throw new GFacException("An error occurred while creating GSI security context", e);
+        }
+    }
+
+    public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) {
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        TaskDetails taskData = jobExecutionContext.getTaskData();
+        ResourceJobManager resourceJobManager = jobExecutionContext.getResourceJobManager();
+        try {
+			if(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")){
+				jobDescriptor.setMailOptions(ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS));
+				String emailids = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+
+				if(jobExecutionContext.getTaskData().isSetEmailAddresses()){
+					List<String> emailList = jobExecutionContext.getTaskData().getEmailAddresses();
+					String elist = GFacUtils.listToCsv(emailList, ',');
+					if(emailids != null && !emailids.isEmpty()){
+						emailids = emailids +"," + elist;
+					}else{
+						emailids = elist;
+					}
+				}
+				if(emailids != null && !emailids.isEmpty()){
+					logger.info("Email list: "+ emailids);
+					jobDescriptor.setMailAddress(emailids);
+				}
+			}
+		} catch (ApplicationSettingsException e) {
+			 logger.error("ApplicationSettingsException : " +e.getLocalizedMessage());
+		}
+        // this is common for any application descriptor
+        jobDescriptor.setCallBackIp(ServerSettings.getIp());
+        jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950"));
+        jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir());
+        jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir());
+        jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath());
+        jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput());
+        jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError());
+        String computationalProjectAccount = taskData.getTaskScheduling().getComputationalProjectAccount();
+        taskData.getEmailAddresses();
+        if (computationalProjectAccount == null){
+            ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference();
+            if (computeResourcePreference != null) {
+                computationalProjectAccount = computeResourcePreference.getAllocationProjectNumber();
+            }
+        }
+        if (computationalProjectAccount != null) {
+            jobDescriptor.setAcountString(computationalProjectAccount);
+        }
+
+        Random random = new Random();
+        int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name
+        jobDescriptor.setJobName("A" + String.valueOf(i+99999999));
+        jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir());
+
+        List<String> inputValues = new ArrayList<String>();
+        MessageContext input = jobExecutionContext.getInMessageContext();
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (Object object : input.getParameters().values()) {
+            if (object instanceof InputDataObjectType) {
+                InputDataObjectType inputDOT = (InputDataObjectType) object;
+                sortedInputSet.add(inputDOT);
+            }
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+                continue;
+            }
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                inputValues.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                if (inputDataObjectType.getType() == DataType.URI) {
+                    // set only the relative path
+                    String filePath = inputDataObjectType.getValue();
+                    filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                    inputValues.add(filePath);
+                }else {
+                    inputValues.add(inputDataObjectType.getValue());
+                }
+
+            }
+        }
+
+        Map<String, Object> outputParams = jobExecutionContext.getOutMessageContext().getParameters();
+        for (Object outputParam : outputParams.values()) {
+            if (outputParam instanceof OutputDataObjectType) {
+                OutputDataObjectType output = (OutputDataObjectType) outputParam;
+                if (output.getApplicationArgument() != null
+                        && !output.getApplicationArgument().equals("")) {
+                    inputValues.add(output.getApplicationArgument());
+                }
+                if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+                    if (output.getType() == DataType.URI){
+                        String filePath = output.getValue();
+                        filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+                        inputValues.add(filePath);
+                    }
+                }
+            }
+        }
+        jobDescriptor.setInputValues(inputValues);
+
+        jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName());
+        jobDescriptor.setShellName("/bin/bash");
+        jobDescriptor.setAllEnvExport(true);
+        jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
+
+        ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling();
+        if (taskScheduling != null) {
+            int totalNodeCount = taskScheduling.getNodeCount();
+            int totalCPUCount = taskScheduling.getTotalCPUCount();
+
+//        jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+            if (taskScheduling.getComputationalProjectAccount() != null) {
+                jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+            }
+            if (taskScheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(taskScheduling.getQueueName());
+            }
+
+            if (totalNodeCount > 0) {
+                jobDescriptor.setNodes(totalNodeCount);
+            }
+            if (taskScheduling.getComputationalProjectAccount() != null) {
+                jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount());
+            }
+            if (taskScheduling.getQueueName() != null) {
+                jobDescriptor.setQueueName(taskScheduling.getQueueName());
+            }
+            if (totalCPUCount > 0) {
+                int ppn = totalCPUCount / totalNodeCount;
+                jobDescriptor.setProcessesPerNode(ppn);
+                jobDescriptor.setCPUCount(totalCPUCount);
+            }
+            if (taskScheduling.getWallTimeLimit() > 0) {
+                jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit()));
+                if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+                    jobDescriptor.setMaxWallTimeForLSF(String.valueOf(taskScheduling.getWallTimeLimit()));
+                }
+            }
+
+            if (taskScheduling.getTotalPhysicalMemory() > 0) {
+                jobDescriptor.setUsedMemory(taskScheduling.getTotalPhysicalMemory() + "");
+            }
+        } else {
+            logger.error("Task scheduling cannot be null at this point..");
+        }
+
+        ApplicationDeploymentDescription appDepDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+        List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+        if (moduleCmds != null) {
+            for (String moduleCmd : moduleCmds) {
+                jobDescriptor.addModuleLoadCommands(moduleCmd);
+            }
+        }
+        List<String> preJobCommands = appDepDescription.getPreJobCommands();
+        if (preJobCommands != null) {
+            for (String preJobCommand : preJobCommands) {
+                jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, jobExecutionContext));
+            }
+        }
+
+        List<String> postJobCommands = appDepDescription.getPostJobCommands();
+        if (postJobCommands != null) {
+            for (String postJobCommand : postJobCommands) {
+                jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, jobExecutionContext));
+            }
+        }
+
+        ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+        if (parallelism != null){
+            if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+                Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+                if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+                    for (JobManagerCommand command : jobManagerCommands.keySet()) {
+                        if (command == JobManagerCommand.SUBMISSION) {
+                            String commandVal = jobManagerCommands.get(command);
+                            jobDescriptor.setJobSubmitter(commandVal);
+                        }
+                    }
+                }
+            }
+        }
+        return jobDescriptor;
+    }
+
+    private static String parseCommand(String value, JobExecutionContext jobExecutionContext) {
+        String parsedValue = value.replaceAll("\\$workingDir", jobExecutionContext.getWorkingDir());
+        parsedValue = parsedValue.replaceAll("\\$inputDir", jobExecutionContext.getInputDir());
+        parsedValue = parsedValue.replaceAll("\\$outputDir", jobExecutionContext.getOutputDir());
+        return parsedValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..e7c6572
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataJobStatusUpdator.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+
+    @Subscribe
+    public void updateRegistry(JobStatusChangeRequestEvent jobStatus) throws Exception{
+        /* Here we need to parse the jobStatus message and update
+                the registry accordingly, for now we are just printing to standard Out
+                 */
+        JobState state = jobStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = jobStatus.getJobIdentity().getTaskId();
+                String jobID = jobStatus.getJobIdentity().getJobId();
+                String expId = jobStatus.getJobIdentity().getExperimentId();
+                updateJobStatus(expId,taskID, jobID, state);
+    			logger.debug("expId - {}: Publishing job status for " + jobStatus.getJobIdentity().getJobId() + ":"
+                        + state.toString(),jobStatus.getJobIdentity().getExperimentId());
+                JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
+                monitorPublisher.publish(event);
+                String messageId = AiravataUtils.getId("JOB");
+                MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId, jobStatus.getJobIdentity().getGatewayId());
+                msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                publisher.publish(msgCntxt);
+            } catch (Exception e) {
+                logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data"
+                        + e.getLocalizedMessage(), e);
+                throw new Exception("Error persisting job status..", e);
+            }
+        }
+    }
+
+    public  void updateJobStatus(String expId, String taskId, String jobID, JobState state) throws Exception {
+        logger.info("expId - {}: Updating job status for " + jobID + ":" + state.toString(), expId);
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = (JobDetails) airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids);
+        if (details == null) {
+            details = new JobDetails();
+        }
+        org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+        if (JobState.CANCELED.equals(details.getJobStatus().getJobState()) ||
+                JobState.CANCELING.equals(details.getJobStatus().getJobState())) {
+            status.setJobState(details.getJobStatus().getJobState());
+        } else {
+            status.setJobState(state);
+        }
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setJobStatus(status);
+        details.setJobID(jobID);
+        logger.debug("expId - {}: Updated job status for " + jobID + ":" + details.getJobStatus().toString(), expId);
+        airavataRegistry.update(RegistryModelType.JOB_STATUS, status, ids);
+    }
+
+	@SuppressWarnings("unchecked")
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..94029be
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataTaskStatusUpdator.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+    private Registry airavataRegistry;
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+    
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void setupTaskStatus(TaskStatusChangeRequestEvent taskStatus) throws Exception{
+    	try {
+			updateTaskStatus(taskStatus.getTaskIdentity().getTaskId(), taskStatus.getState());
+            logger.debug("expId - {}: Publishing task status for " + taskStatus.getTaskIdentity().getTaskId() + ":"
+                    + taskStatus.getState().toString(), taskStatus.getTaskIdentity().getExperimentId());
+            TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity());
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("TASK");
+            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId, taskStatus.getTaskIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            publisher.publish(msgCntxt);
+		} catch (Exception e) {
+            String msg = "Error persisting data task status to database...";
+            logger.error(msg + e.getLocalizedMessage(), e);
+            throw new Exception(msg, e);
+		}
+    }
+
+    @Subscribe
+    public void setupTaskStatus(JobStatusChangeEvent jobStatus) throws Exception{
+    	TaskState state=TaskState.UNKNOWN;
+    	switch(jobStatus.getState()){
+    	case ACTIVE:
+    		state=TaskState.EXECUTING; break;
+    	case CANCELED:
+    		state=TaskState.CANCELED; break;
+    	case COMPLETE: case FAILED:
+    		state=TaskState.POST_PROCESSING; break;
+    	case HELD: case SUSPENDED: case QUEUED:
+    		state=TaskState.WAITING; break;
+    	case SETUP:
+    		state=TaskState.PRE_PROCESSING; break;
+    	case SUBMITTED:
+    		state=TaskState.STARTED; break;
+    	case UN_SUBMITTED:
+    		state=TaskState.CANCELED; break;
+    	case CANCELING:
+    		state=TaskState.CANCELING; break;
+		default:
+			return;
+    	}
+    	try {
+			updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state);
+            logger.debug("expId - {}: Publishing task status for " + jobStatus.getJobIdentity().getTaskId() + ":"
+                    + state.toString(), jobStatus.getJobIdentity().getExperimentId());
+            TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(),
+                                                         jobStatus.getJobIdentity().getWorkflowNodeId(),
+                                                         jobStatus.getJobIdentity().getExperimentId(),
+                                                         jobStatus.getJobIdentity().getGatewayId());
+            TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity);
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("TASK");
+            MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId,jobStatus.getJobIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            publisher.publish(msgCntxt);
+
+        }  catch (Exception e) {
+            logger.error("expId - " + jobStatus.getJobIdentity().getExperimentId() + ": Error persisting data" + e.getLocalizedMessage(), e);
+            throw new Exception("Error persisting task status..", e);
+		}
+    }
+    
+    public  TaskState updateTaskStatus(String taskId, TaskState state) throws Exception {
+    	TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId);
+        if(details == null) {
+            logger.error("Task details cannot be null at this point");
+            throw new Exception("Task details cannot be null at this point");
+        }
+        org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
+        if(!TaskState.CANCELED.equals(details.getTaskStatus().getExecutionState())
+                && !TaskState.CANCELING.equals(details.getTaskStatus().getExecutionState())){
+            status.setExecutionState(state);
+        }else{
+            status.setExecutionState(details.getTaskStatus().getExecutionState());
+        }
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setTaskStatus(status);
+        logger.debug("Updating task status for "+taskId+":"+details.getTaskStatus().toString());
+
+        airavataRegistry.update(RegistryModelType.TASK_STATUS, status, taskId);
+        return status.getExecutionState();
+    }
+
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
+	}
+
+
+    @Subscribe
+    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException {
+        String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+        logger.debug("Task Output changed event received for workflow node : " +
+                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+        // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too.
+        MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId());
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        publisher.publish(messageContext);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..092774b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.impl;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.WorkflowIdentifier;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private Registry airavataRegistry;
+    private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+
+
+
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void setupWorkflowNodeStatus(TaskStatusChangeEvent taskStatus) throws Exception{
+    	WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+    	switch(taskStatus.getState()){
+    	case CANCELED:
+    		state=WorkflowNodeState.CANCELED; break;
+    	case COMPLETED:
+    		state=WorkflowNodeState.COMPLETED; break;
+    	case CONFIGURING_WORKSPACE:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case FAILED:
+    		state=WorkflowNodeState.FAILED; break;
+    	case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+    		state=WorkflowNodeState.EXECUTING; break;
+    	case STARTED:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case CANCELING:
+    		state=WorkflowNodeState.CANCELING; break;
+		default:
+			return;
+    	}
+    	try {
+            String expId = taskStatus.getTaskIdentity().getExperimentId();
+			updateWorkflowNodeStatus(expId, taskStatus.getTaskIdentity().getWorkflowNodeId(), state);
+            logger.debug("expId - {}: Publishing workflow node status for " + taskStatus.getTaskIdentity().getWorkflowNodeId()
+                    + ":" + state.toString(), taskStatus.getTaskIdentity().getExperimentId());
+            WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(),
+                                                                         taskStatus.getTaskIdentity().getExperimentId(),
+                                                                         taskStatus.getTaskIdentity().getGatewayId());
+            WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+            monitorPublisher.publish(event);
+            String messageId = AiravataUtils.getId("WFNODE");
+            MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId, taskStatus.getTaskIdentity().getGatewayId());
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+            publisher.publish(msgCntxt);
+		} catch (Exception e) {
+            logger.error("expId - " + taskStatus.getTaskIdentity().getExperimentId() + ": Error persisting data"
+                    + e.getLocalizedMessage(), e);
+            throw new Exception("Error persisting workflow node status..", e);
+        }
+    }
+
+    public  void updateWorkflowNodeStatus(String experimentId, String workflowNodeId, WorkflowNodeState state) throws Exception {
+		logger.info("expId - {}: Updating workflow node status for "+workflowNodeId+":"+state.toString(), experimentId);
+    	WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+        if(details == null) {
+            details = new WorkflowNodeDetails();
+            details.setNodeInstanceId(workflowNodeId);
+        }
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setWorkflowNodeStatus(status);
+        airavataRegistry.update(RegistryModelType.WORKFLOW_NODE_STATUS, status, workflowNodeId);
+    }
+
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			}  else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
+	}
+}