You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/05 17:07:14 UTC

[1/3] fixing more packaing issue

Repository: airavata
Updated Branches:
  refs/heads/master 49b6987f3 -> 6209ee096


http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
new file mode 100644
index 0000000..b0fd9eb
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -0,0 +1,310 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.ssh.provider.impl;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.CommandExecutor;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
+import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * Execute application using remote SSH
+ */
+public class SSHProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+    private Cluster cluster;
+    private String jobID = null;
+    private String taskID = null;
+    // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
+    private boolean hpcType = false;
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+    	super.initialize(jobExecutionContext);
+    	 if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
+            try {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            } catch (ApplicationSettingsException e) {
+                log.error(e.getMessage());
+                throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+            }
+        }
+        taskID = jobExecutionContext.getTaskData().getTaskID();
+		if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+            jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+            
+            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+            String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+            details.setJobID(taskID);
+            details.setJobDescription(remoteFile);
+            jobExecutionContext.setJobDetails(details);
+            JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
+            details.setJobDescription(jobDescriptor.toXML());
+         
+            GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
+            log.info(remoteFile);
+            try {
+            	File runscript = createShellScript(jobExecutionContext);
+                cluster.scpTo(remoteFile, runscript.getAbsolutePath());
+            } catch (Exception e) {
+                throw new GFacProviderException(e.getLocalizedMessage(), e);
+            }
+        }else{
+           hpcType = true;
+        }
+    }
+
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        if (!hpcType) {
+            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+            try {
+                /*
+                 * Execute
+                 */
+                String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+                details.setJobDescription(execuable);
+
+//                GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
+                RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
+
+                StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
+                
+                CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
+                String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
+
+                log.info("stdout=" + stdOutputString);
+             
+//                GFacUtils.updateJobStatus(details, JobState.COMPLETE);
+            } catch (Exception e) {
+                throw new GFacProviderException(e.getMessage(), e);
+            } finally {
+                if (cluster != null) {
+                	try {
+						cluster.disconnect();
+					} catch (SSHApiException e) {
+						throw new GFacProviderException(e.getMessage(), e);
+					}
+                }
+            }
+        } else {
+            try {
+                jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+                HostDescriptionType host = jobExecutionContext.getApplicationContext().
+                        getHostDescription().getType();
+                HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+                        getApplicationDeploymentDescription().getType();
+                JobDetails jobDetails = new JobDetails();
+                String taskID = jobExecutionContext.getTaskData().getTaskID();
+                try {
+                    Cluster cluster = null;
+                    if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
+                        cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+                    }
+                    if (cluster == null) {
+                        throw new GFacProviderException("Security context is not set properly");
+                    } else {
+                        log.info("Successfully retrieved the Security Context");
+                    }
+                    // This installed path is a mandetory field, because this could change based on the computing resource
+                    JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+                    log.info(jobDescriptor.toXML());
+
+                    jobDetails.setJobDescription(jobDescriptor.toXML());
+
+                    String jobID = cluster.submitBatchJob(jobDescriptor);
+                    jobExecutionContext.setJobDetails(jobDetails);
+                    if (jobID == null) {
+                        jobDetails.setJobID("none");
+                        GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+                    } else {
+                        jobDetails.setJobID(jobID);
+                        GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+                    }
+
+                } catch (SSHApiException e) {
+                    String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+                    log.error(error);
+                    jobDetails.setJobID("none");
+                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    throw new GFacProviderException(error, e);
+                } catch (Exception e) {
+                    String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+                    log.error(error);
+                    jobDetails.setJobID("none");
+                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    throw new GFacProviderException(error, e);
+                }
+            } catch (GFacException e) {
+                throw new GFacProviderException(e.getMessage(), e);
+            }
+        }
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+
+    }
+
+
+    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+        throw new NotImplementedException();
+    }
+
+
+    private File createShellScript(JobExecutionContext context) throws IOException {
+        ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+                + new Random().nextLong();
+
+        File shellScript = File.createTempFile(uniqueDir, "sh");
+        OutputStream out = new FileOutputStream(shellScript);
+
+        out.write("#!/bin/bash\n".getBytes());
+        out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+        out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+        out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+                .getBytes());
+        // get the env of the host and the application
+        NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+        Map<String, String> nv = new HashMap<String, String>();
+        if (env != null) {
+            for (int i = 0; i < env.length; i++) {
+                String key = env[i].getName();
+                String value = env[i].getValue();
+                nv.put(key, value);
+            }
+        }
+        for (Entry<String, String> entry : nv.entrySet()) {
+            log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+            out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+        }
+
+        // prepare the command
+        final String SPACE = " ";
+        StringBuffer cmd = new StringBuffer();
+        cmd.append(app.getExecutableLocation());
+        cmd.append(SPACE);
+
+        MessageContext input = context.getInMessageContext();
+        ;
+        Map<String, Object> inputs = input.getParameters();
+        Set<String> keys = inputs.keySet();
+        for (String paramName : keys) {
+            ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+            if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+                for (String value : values) {
+                    cmd.append(value);
+                    cmd.append(SPACE);
+                }
+            } else {
+                String paramValue = MappingFactory.toString(actualParameter);
+                cmd.append(paramValue);
+                cmd.append(SPACE);
+            }
+        }
+        // We redirect the error and stdout to remote files, they will be read
+        // in later
+        cmd.append(SPACE);
+        cmd.append("1>");
+        cmd.append(SPACE);
+        cmd.append(app.getStandardOutput());
+        cmd.append(SPACE);
+        cmd.append("2>");
+        cmd.append(SPACE);
+        cmd.append(app.getStandardError());
+
+        String cmdStr = cmd.toString();
+        log.info("Command = " + cmdStr);
+        out.write((cmdStr + "\n").getBytes());
+        String message = "\"execuationSuceeded\"";
+        out.write(("echo " + message + "\n").getBytes());
+        out.close();
+
+        return shellScript;
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+    /**
+     * This method will read standard output and if there's any it will be parsed
+     * @param jobIDReaderCommandOutput
+     * @param errorMsg
+     * @return
+     * @throws SSHApiException
+     */
+    private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
+        String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
+        String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
+
+        if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
+            log.error("Standard Error output : " + stdErrorString);
+            throw new SSHApiException(errorMsg + stdErrorString);
+        }
+        return stdOutputString;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/integration-tests/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/integration-tests/src/test/resources/gfac-config.xml b/modules/integration-tests/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/integration-tests/src/test/resources/gfac-config.xml
+++ b/modules/integration-tests/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
         </InHandlers>
         <OutHandlers></OutHandlers>
     </GlobalHandlers>
-    <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+    <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
         <InHandlers>
             <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
         </InHandlers>

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
@@ -24,7 +24,7 @@
         </InHandlers>
         <OutHandlers></OutHandlers>
     </GlobalHandlers>
-    <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+    <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
         <InHandlers>
             <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
         </InHandlers>

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
index 06afc6e..9ebee37 100644
--- a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
+++ b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml
@@ -24,7 +24,7 @@
         </InHandlers>
         <OutHandlers></OutHandlers>
     </GlobalHandlers>
-    <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+    <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
         <InHandlers>
             <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
         </InHandlers>


[2/3] fixing more packaing issue

Posted by la...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
new file mode 100644
index 0000000..03c3fee
--- /dev/null
+++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java
@@ -0,0 +1,527 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gram.provider.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.JobSubmissionFault;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gram.security.GSISecurityContext;
+import org.apache.airavata.gfac.gram.util.GramJobSubmissionListener;
+import org.apache.airavata.gfac.gram.util.GramProviderUtils;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.globus.gram.WaitingForCommitException;
+import org.globus.gram.internal.GRAMConstants;
+import org.globus.gram.internal.GRAMProtocolErrorConstants;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+    private GramJob job;
+    private GramJobSubmissionListener listener;
+    private boolean twoPhase = true;
+
+    /**
+     * If normal job submission fail due to an authorisation failure or script failure we
+     * will re-attempt to submit the job. In-order to avoid any recursive loop during a continuous
+     * failure we track whether failure paths are tried or not. Following booleans keeps track whether
+     * we already tried a failure path or not.
+     */
+    /**
+     * To track job submissions during a authorisation failure while requesting job.
+     */
+    private boolean renewCredentialsAttempt = false;
+    /**
+     * To track job submission during a script error situation.
+     */
+    private boolean reSubmissionInProgress = false;
+    /**
+     * To track authorisation failures during status monitoring.
+     */
+    private boolean authorisationFailedAttempt = false;
+
+    private static final Map<String, GramJob> currentlyExecutingJobCache
+            = new ConcurrentHashMap<String, GramJob>();
+
+    private static Properties resources;
+
+    static {
+        try {
+
+            String propFileName = "errors.properties";
+            resources = new Properties();
+            InputStream inputStream = GramProvider.class.getClassLoader()
+                    .getResourceAsStream(propFileName);
+
+            if (inputStream == null) {
+                throw new FileNotFoundException("property file '" + propFileName
+                        + "' not found in the classpath");
+            }
+
+            resources.load(inputStream);
+
+        } catch (FileNotFoundException mre) {
+            log.error("errors.properties not found", mre);
+        } catch (IOException e) {
+            log.error("Error reading errors.properties file", e);
+        }
+    }
+
+
+    // This method prepare the environment before the application invocation.
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+
+        try {
+        	super.initialize(jobExecutionContext);
+            String strTwoPhase = ServerSettings.getSetting("TwoPhase");
+            if (strTwoPhase != null) {
+                twoPhase = Boolean.parseBoolean(strTwoPhase);
+                log.info("Two phase commit is set to " + twoPhase);
+            }
+        } catch (ApplicationSettingsException e) {
+            log.warn("Error reading TwoPhase property from configurations.", e);
+        }
+
+        job = GramProviderUtils.setupEnvironment(jobExecutionContext, twoPhase);
+        listener = new GramJobSubmissionListener(job, jobExecutionContext);
+        job.addListener(listener);
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException{
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+        GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().
+                getHostDescription().getType();
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+                getApplicationDeploymentDescription().getType();
+
+        StringBuilder stringBuilder = new StringBuilder();
+        try {
+
+            GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+                    getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+            job.setCredentials(gssCred);
+            // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
+            String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
+            log.info("Request to contact:" + gateKeeper);
+
+            stringBuilder.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
+                    .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
+                    .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+                    .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
+
+            log.info(stringBuilder.toString());
+
+            submitJobs(gateKeeper, jobExecutionContext, host);
+
+        } catch (ApplicationSettingsException e) {
+        	throw new GFacException(e.getMessage(), e);
+		} finally {
+            if (job != null) {
+                try {
+                	 /*
+                     * Remove listener
+                     */
+                    job.removeListener(listener);
+                } catch (Exception e) {
+                	 log.error(e.getMessage());
+                }
+            }
+        }
+    }
+
+    private void submitJobs(String gateKeeper,
+                            JobExecutionContext jobExecutionContext,
+                            GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+    	boolean applicationSaved=false;
+    	String taskID = jobExecutionContext.getTaskData().getTaskID();
+			
+    	if (twoPhase) {
+            try {
+                /*
+                * The first boolean is to force communication through SSLv3
+                * The second boolean is to specify the job is a batch job - use true for interactive and false for
+                 * batch.
+                * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+                */
+                job.request(true, gateKeeper, false, false);
+
+                // Single boolean to track all authentication failures, therefore we need to re-initialize
+                // this here
+                renewCredentialsAttempt = false;
+
+            } catch (WaitingForCommitException e) {
+            	String jobID = job.getIDAsString();
+				
+            	details.setJobID(jobID);
+            	details.setJobDescription(job.getRSL());
+                jobExecutionContext.setJobDetails(details);
+                GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.UN_SUBMITTED);
+                
+                applicationSaved=true;
+                String jobStatusMessage = "Un-submitted JobID= " + jobID;
+                log.info(jobStatusMessage);
+                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+                log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
+
+                try {
+                    job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+
+                } catch (GramException gramException) {
+                    throw new GFacException("Error while sending commit request. Job Id - "
+                            + job.getIDAsString(), gramException);
+                } catch (GSSException gssException) {
+
+                    // User credentials are invalid
+                    log.error("Error while submitting commit request - Credentials provided are invalid. Job Id - "
+                            + job.getIDAsString(), e);
+                    log.info("Attempting to renew credentials and re-submit commit signal...");
+                	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    renewCredentials(jobExecutionContext);
+
+                    try {
+                        job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
+                    } catch (GramException e1) {
+                     	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    	throw new GFacException("Error while sending commit request. Job Id - "
+                                + job.getIDAsString(), e1);
+                    } catch (GSSException e1) {
+                     	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                        throw new GFacException("Error while sending commit request. Job Id - "
+                                + job.getIDAsString() + ". Credentials provided invalid", e1);
+                    }
+                }
+                GFacUtils.updateJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
+                jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
+                log.info(jobStatusMessage);
+                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+            } catch (GSSException e) {
+                // Renew credentials and re-submit
+             	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                
+                reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+
+            } catch (GramException e) {
+             	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                
+            	throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+            }
+        } else {
+
+            /*
+            * The first boolean is to force communication through SSLv3
+            * The second boolean is to specify the job is a batch job - use true for interactive and false for
+             * batch.
+            * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
+            */
+            try {
+
+                job.request(true, gateKeeper, false, false);
+                renewCredentialsAttempt = false;
+
+            } catch (GramException e) {
+            	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
+            } catch (GSSException e) {
+            	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                // Renew credentials and re-submit
+                reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
+            }
+
+            String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
+            log.info(jobStatusMessage);
+            jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
+
+        }
+
+        currentlyExecutingJobCache.put(job.getIDAsString(), job);
+        /*
+        * Wait until job is done
+        */
+        listener.waitFor();
+
+        checkJobStatus(jobExecutionContext, globusHostType, gateKeeper);
+
+    }
+
+    private void renewCredentials(JobExecutionContext jobExecutionContext) throws GFacException {
+
+        renewCredentials(this.job, jobExecutionContext);
+    }
+
+    private void renewCredentials(GramJob gramJob, JobExecutionContext jobExecutionContext) throws GFacException {
+
+        try {
+        	GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
+                    getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).renewCredentials();
+            gramJob.renew(gssCred);
+        } catch (GramException e1) {
+            throw new GFacException("Unable to renew credentials. Job Id - "
+                    + gramJob.getIDAsString(), e1);
+        } catch (GSSException e1) {
+            throw new GFacException("Unable to renew credentials. Job Id - "
+                    + gramJob.getIDAsString(), e1);
+        } catch (ApplicationSettingsException e) {
+        	throw new GFacException(e.getLocalizedMessage(), e);
+		}
+    }
+
+    private void reSubmitJob(String gateKeeper,
+                             JobExecutionContext jobExecutionContext,
+                             GlobusHostType globusHostType, Exception e) throws GFacException, GFacProviderException {
+
+        if (!renewCredentialsAttempt) {
+
+            renewCredentialsAttempt = true;
+
+            // User credentials are invalid
+            log.error("Error while submitting job - Credentials provided are invalid. Job Id - "
+                    + job.getIDAsString(), e);
+            log.info("Attempting to renew credentials and re-submit jobs...");
+
+            // Remove existing listener and register a new listener
+            job.removeListener(listener);
+            listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+            job.addListener(listener);
+
+            renewCredentials(jobExecutionContext);
+
+            submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+        } else {
+            throw new GFacException("Error while submitting job - Credentials provided are invalid. Job Id - "
+                    + job.getIDAsString(), e);
+        }
+
+    }
+
+    private void reSubmitJob(String gateKeeper,
+                             JobExecutionContext jobExecutionContext,
+                             GlobusHostType globusHostType) throws GFacException, GFacProviderException {
+
+        // User credentials are invalid
+        log.info("Attempting to renew credentials and re-submit jobs...");
+
+        // Remove existing listener and register a new listener
+        job.removeListener(listener);
+        listener = new GramJobSubmissionListener(job, jobExecutionContext);
+
+        job.addListener(listener);
+
+        renewCredentials(jobExecutionContext);
+
+        submitJobs(gateKeeper, jobExecutionContext, globusHostType);
+
+    }
+
+	
+	
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+    }
+
+    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+        cancelSingleJob(jobId, jobExecutionContext);
+    }
+
+
+    private void cancelSingleJob(String jobId, JobExecutionContext context) throws GFacException {
+        // First check whether job id is in the cache
+        if (currentlyExecutingJobCache.containsKey(jobId)) {
+
+            synchronized (this) {
+                GramJob gramJob = currentlyExecutingJobCache.get(jobId);
+
+                // Even though we check using containsKey, at this point job could be null
+                if (gramJob != null && (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+                        gramJob.getStatus() != GRAMConstants.STATUS_FAILED)) {
+                    cancelJob(gramJob, context);
+                }
+            }
+
+        } else {
+
+            try {
+				GSSCredential gssCred = ((GSISecurityContext)context.
+				        getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
+
+				GramJob gramJob = new GramJob(null);
+				try {
+				    gramJob.setID(jobId);
+				} catch (MalformedURLException e) {
+				    throw new GFacException("Invalid job id - " + jobId, e);
+				}
+				gramJob.setCredentials(gssCred);
+
+				synchronized (this) {
+				    if (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
+				            gramJob.getStatus() != GRAMConstants.STATUS_FAILED) {
+				        cancelJob(gramJob, context);
+				    }
+				}
+			} catch (ApplicationSettingsException e) {
+				throw new GFacException(e);
+			}
+        }
+    }
+
+    private void cancelJob(GramJob gramJob, JobExecutionContext context) throws GFacException{
+
+        try {
+            gramJob.cancel();
+        } catch (GramException e) {
+            throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e);
+        } catch (GSSException e) {
+
+            log.warn("Credentials invalid to cancel job. Attempting to renew credentials and re-try. " +
+                    "Job id - " + gramJob.getIDAsString());
+            renewCredentials(gramJob, context);
+
+            try {
+                gramJob.cancel();
+                gramJob.signal(GramJob.SIGNAL_COMMIT_END);
+            } catch (GramException e1) {
+                throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e1);
+            } catch (GSSException e1) {
+                throw new GFacException("Error cancelling job, invalid credentials. Job id - "
+                        + gramJob.getIDAsString(), e);
+            }
+        }
+
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacException {
+
+    }
+
+    private void checkJobStatus(JobExecutionContext jobExecutionContext, GlobusHostType host, String gateKeeper)
+            throws GFacProviderException {
+        int jobStatus = listener.getCurrentStatus();
+
+        if (jobStatus == GramJob.STATUS_FAILED) {
+            
+            String errorMsg = "Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " Job Exit Code = "
+                    + listener.getError() + " Error Description = " + getGramErrorString(listener.getError());
+
+            if (listener.getError() == GRAMProtocolErrorConstants.INVALID_SCRIPT_REPLY) {
+
+                // re-submitting without renewing
+                // TODO verify why we re-submit jobs when we get a invalid script reply
+                if (!reSubmissionInProgress) {
+                    reSubmissionInProgress = true;
+
+                    log.info("Invalid script reply received. Re-submitting job, id - " + job.getIDAsString());
+                    try {
+                        reSubmitJob(gateKeeper, jobExecutionContext, host);
+                    } catch (GFacException e) {
+                        throw new GFacProviderException
+                                ("Error during re-submission. Original job submission data - " + errorMsg,  e);
+                    }
+                    return;
+                }
+
+            } else if (listener.getError() == GRAMProtocolErrorConstants.ERROR_AUTHORIZATION) {
+
+                // re-submit with renewed credentials
+                if (!authorisationFailedAttempt) {
+                    authorisationFailedAttempt = true;
+                    log.info("Authorisation error contacting provider. Re-submitting job with renewed credentials.");
+
+                    try {
+                        renewCredentials(jobExecutionContext);
+                        reSubmitJob(gateKeeper, jobExecutionContext, host);
+                    } catch (GFacException e) {
+                        throw new GFacProviderException
+                                ("Error during re-submission. Original job submission data - " + errorMsg,  e);
+                    }
+
+                    return;
+                }
+
+            } else if (listener.getError() == GRAMProtocolErrorConstants.USER_CANCELLED) {
+
+                log.info("User successfully cancelled job id " + job.getIDAsString());
+                return;
+            }
+
+
+
+            log.error(errorMsg);
+
+            synchronized (this) {
+                currentlyExecutingJobCache.remove(job.getIDAsString());
+            }
+
+            throw new JobSubmissionFault(new Exception(errorMsg), host.getHostAddress(), gateKeeper,
+                            job.getRSL(), jobExecutionContext, getGramErrorString(listener.getError()),
+                    listener.getError());
+
+        } else if (jobStatus == GramJob.STATUS_DONE) {
+            log.info("Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " is successfully executed.");
+
+            synchronized (this) {
+                currentlyExecutingJobCache.remove(job.getIDAsString());
+            }
+        }
+    }
+
+    public String getGramErrorString(int errorCode) {
+
+        if (resources != null) {
+            try {
+                return resources.getProperty(String.valueOf(errorCode));
+            } catch (MissingResourceException mre) {
+                log.warn("Error reading globus error descriptions.", mre);
+                return "Error code: " + errorCode;
+            }
+        } else {
+            return "Error code: " + errorCode;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
deleted file mode 100644
index 5ba5ebf..0000000
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/impl/GSISSHProvider.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gsissh.impl;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.cpi.GFacImpl;
-import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
-import org.apache.airavata.gfac.handler.ThreadedHandler;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
-import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class GSISSHProvider extends AbstractProvider {
-    private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
-
-    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
-    }
-
-    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-    	super.initialize(jobExecutionContext);
-    }
-
-    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-        log.info("Invoking GSISSH Provider Invoke ...");
-        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-        HostDescriptionType host = jobExecutionContext.getApplicationContext().
-                getHostDescription().getType();
-        HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
-                getApplicationDeploymentDescription().getType();
-        JobDetails jobDetails = new JobDetails();
-     	String taskID = jobExecutionContext.getTaskData().getTaskID();
-        try {
-            Cluster cluster = null;
-            if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
-                cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
-            }
-            if (cluster == null) {
-                throw new GFacProviderException("Security context is not set properly");
-            } else {
-                log.info("Successfully retrieved the Security Context");
-            }
-            // This installed path is a mandetory field, because this could change based on the computing resource
-            JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
-            log.info(jobDescriptor.toXML());
-            
-            jobDetails.setJobDescription(jobDescriptor.toXML());
-            
-            String jobID = cluster.submitBatchJob(jobDescriptor);
-            jobExecutionContext.setJobDetails(jobDetails);
-            if(jobID == null){
-                jobDetails.setJobID("none");
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
-            }else{
-                jobDetails.setJobID(jobID);
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
-            }
-
-
-            // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
-            // to perform monitoring, daemon handlers can be accessed from anywhere
-            List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
-            ThreadedHandler pullMonitorHandler = null;
-            for(ThreadedHandler threadedHandler:daemonHandlers){
-                if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
-                    pullMonitorHandler = threadedHandler;
-                }
-            }
-            // we know this hos is type GsiSSHHostType
-            String monitorMode = ((GsisshHostType) host).getMonitorMode();
-            if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
-                log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
-                pullMonitorHandler.invoke(jobExecutionContext);
-            }else{
-                log.error("Currently we only support Pull monitoring");
-            }
-        } catch (SSHApiException e) {
-            String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
-            log.error(error);
-            jobDetails.setJobID("none");
-        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
-         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            throw new GFacProviderException(error, e);
-        } catch (Exception e) {
-        	String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
-         	log.error(error);
-            jobDetails.setJobID("none");
-        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
-         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            throw new GFacProviderException(error, e);
-        }
-    }
-
-    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
new file mode 100644
index 0000000..abca9d9
--- /dev/null
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.gsissh.provider.impl;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils;
+import org.apache.airavata.gfac.handler.ThreadedHandler;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class GSISSHProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+    	super.initialize(jobExecutionContext);
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        log.info("Invoking GSISSH Provider Invoke ...");
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+        HostDescriptionType host = jobExecutionContext.getApplicationContext().
+                getHostDescription().getType();
+        HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
+                getApplicationDeploymentDescription().getType();
+        JobDetails jobDetails = new JobDetails();
+     	String taskID = jobExecutionContext.getTaskData().getTaskID();
+        try {
+            Cluster cluster = null;
+            if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
+                cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+            }
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+            // This installed path is a mandetory field, because this could change based on the computing resource
+            JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
+
+            log.info(jobDescriptor.toXML());
+            
+            jobDetails.setJobDescription(jobDescriptor.toXML());
+            
+            String jobID = cluster.submitBatchJob(jobDescriptor);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if(jobID == null){
+                jobDetails.setJobID("none");
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            }else{
+                jobDetails.setJobID(jobID);
+                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+            }
+
+
+            // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
+            // to perform monitoring, daemon handlers can be accessed from anywhere
+            List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+            ThreadedHandler pullMonitorHandler = null;
+            for(ThreadedHandler threadedHandler:daemonHandlers){
+                if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
+                    pullMonitorHandler = threadedHandler;
+                }
+            }
+            // we know this hos is type GsiSSHHostType
+            String monitorMode = ((GsisshHostType) host).getMonitorMode();
+            if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
+                log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
+                pullMonitorHandler.invoke(jobExecutionContext);
+            }else{
+                log.error("Currently we only support Pull monitoring");
+            }
+        } catch (SSHApiException e) {
+            String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+            log.error(error);
+            jobDetails.setJobID("none");
+        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        } catch (Exception e) {
+        	String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+         	log.error(error);
+            jobDetails.setJobID("none");
+        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
+         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            throw new GFacProviderException(error, e);
+        }
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
deleted file mode 100644
index f0a0bf9..0000000
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/impl/LocalProvider.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.local.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
-import org.apache.airavata.gfac.local.utils.InputUtils;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.provider.utils.ProviderUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.NameValuePairType;
-import org.apache.xmlbeans.XmlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-public class LocalProvider extends AbstractProvider {
-    private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
-    private ProcessBuilder builder;
-    private List<String> cmdList;
-    private String jobId;
-    
-    public static class LocalProviderJobData{
-    	private String applicationName;
-    	private List<String> inputParameters;
-    	private String workingDir;
-    	private String inputDir;
-    	private String outputDir;
-		public String getApplicationName() {
-			return applicationName;
-		}
-		public void setApplicationName(String applicationName) {
-			this.applicationName = applicationName;
-		}
-		public List<String> getInputParameters() {
-			return inputParameters;
-		}
-		public void setInputParameters(List<String> inputParameters) {
-			this.inputParameters = inputParameters;
-		}
-		public String getWorkingDir() {
-			return workingDir;
-		}
-		public void setWorkingDir(String workingDir) {
-			this.workingDir = workingDir;
-		}
-		public String getInputDir() {
-			return inputDir;
-		}
-		public void setInputDir(String inputDir) {
-			this.inputDir = inputDir;
-		}
-		public String getOutputDir() {
-			return outputDir;
-		}
-		public void setOutputDir(String outputDir) {
-			this.outputDir = outputDir;
-		}
-    }
-    public LocalProvider(){
-        cmdList = new ArrayList<String>();
-    }
-
-    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
-    	super.initialize(jobExecutionContext);
-        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
-                getApplicationDeploymentDescription().getType();
-
-        buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
-        initProcessBuilder(app);
-
-        // extra environment variables
-        builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
-        builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
-
-        // set working directory
-        builder.directory(new File(app.getStaticWorkingDirectory()));
-
-        // log info
-        log.info("Command = " + InputUtils.buildCommand(cmdList));
-        log.info("Working dir = " + builder.directory());
-        for (String key : builder.environment().keySet()) {
-            log.info("Env[" + key + "] = " + builder.environment().get(key));
-        }
-    }
-
-    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-         ApplicationDeploymentDescriptionType app = jobExecutionContext.
-                 getApplicationContext().getApplicationDeploymentDescription().getType();
-        JobDetails jobDetails = new JobDetails();
-        try {
-        	jobId = jobExecutionContext.getTaskData().getTaskID();
-            jobDetails.setJobID(jobId);
-            jobDetails.setJobDescription(app.toString());
-            jobExecutionContext.setJobDetails(jobDetails);
-            jobDetails.setJobDescription(app.toString());
-            GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
-        	// running cmd
-            Process process = builder.start();
-
-            Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
-            Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
-
-            // start output threads
-            standardOutWriter.setDaemon(true);
-            standardErrorWriter.setDaemon(true);
-            standardOutWriter.start();
-            standardErrorWriter.start();
-
-            int returnValue = process.waitFor();
-
-            // make sure other two threads are done
-            standardOutWriter.join();
-            standardErrorWriter.join();
-
-            /*
-             * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
-             * just provide warning in the log messages
-             */
-            if (returnValue != 0) {
-                log.error("Process finished with non zero return value. Process may have failed");
-            } else {
-                log.info("Process finished with return value of zero.");
-            }
-
-            StringBuffer buf = new StringBuffer();
-            buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
-                    .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
-                    .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
-                    .append(String.valueOf(returnValue));
-            log.info(buf.toString());
-        } catch (IOException io) {
-            throw new GFacProviderException(io.getMessage(), io);
-        } catch (InterruptedException e) {
-            throw new GFacProviderException(e.getMessage(), e);
-        }catch (GFacException e) {
-            throw new GFacProviderException(e.getMessage(), e);
-        }
-    }
-
-//	private void saveApplicationJob(JobExecutionContext jobExecutionContext)
-//			throws GFacProviderException {
-//		ApplicationDeploymentDescriptionType app = jobExecutionContext.
-//                getApplicationContext().getApplicationDeploymentDescription().getType();
-//		ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
-//		appJob.setJobId(jobId);
-//		LocalProviderJobData data = new LocalProviderJobData();
-//		data.setApplicationName(app.getExecutableLocation());
-//		data.setInputDir(app.getInputDataDirectory());
-//		data.setOutputDir(app.getOutputDataDirectory());
-//		data.setWorkingDir(builder.directory().toString());
-//		data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
-//		ByteArrayOutputStream stream = new ByteArrayOutputStream();
-//		JAXB.marshal(data, stream);
-//		appJob.setJobData(stream.toString());
-//		appJob.setSubmittedTime(Calendar.getInstance().getTime());
-//		appJob.setStatus(ApplicationJobStatus.SUBMITTED);
-//		appJob.setStatusUpdateTime(appJob.getSubmittedTime());
-//		GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
-//	}
-
-    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-
-        try {
-            String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
-            String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
-			Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
-            OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
-        } catch (XmlException e) {
-            throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
-        } catch (IOException io) {
-            throw new GFacProviderException(io.getMessage(), io);
-        } catch (Exception e){
-        	throw new GFacProviderException("Error in retrieving results",e);
-        }
-    }
-
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
-        throw new NotImplementedException();
-    }
-
-
-    private void buildCommand(String executable, List<String> inputParameterList){
-        cmdList.add(executable);
-        cmdList.addAll(inputParameterList);
-    }
-
-    private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
-        builder = new ProcessBuilder(cmdList);
-
-        NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
-        if(env != null && env.length > 0){
-            Map<String,String> builderEnv = builder.environment();
-            for (NameValuePairType entry : env) {
-                builderEnv.put(entry.getName(), entry.getValue());
-            }
-        }
-    }
-
-    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
new file mode 100644
index 0000000..e4d55b8
--- /dev/null
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class LocalProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+    private ProcessBuilder builder;
+    private List<String> cmdList;
+    private String jobId;
+    
+    public static class LocalProviderJobData{
+    	private String applicationName;
+    	private List<String> inputParameters;
+    	private String workingDir;
+    	private String inputDir;
+    	private String outputDir;
+		public String getApplicationName() {
+			return applicationName;
+		}
+		public void setApplicationName(String applicationName) {
+			this.applicationName = applicationName;
+		}
+		public List<String> getInputParameters() {
+			return inputParameters;
+		}
+		public void setInputParameters(List<String> inputParameters) {
+			this.inputParameters = inputParameters;
+		}
+		public String getWorkingDir() {
+			return workingDir;
+		}
+		public void setWorkingDir(String workingDir) {
+			this.workingDir = workingDir;
+		}
+		public String getInputDir() {
+			return inputDir;
+		}
+		public void setInputDir(String inputDir) {
+			this.inputDir = inputDir;
+		}
+		public String getOutputDir() {
+			return outputDir;
+		}
+		public void setOutputDir(String outputDir) {
+			this.outputDir = outputDir;
+		}
+    }
+    public LocalProvider(){
+        cmdList = new ArrayList<String>();
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+    	super.initialize(jobExecutionContext);
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
+                getApplicationDeploymentDescription().getType();
+
+        buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext));
+        initProcessBuilder(app);
+
+        // extra environment variables
+        builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
+        builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
+
+        // set working directory
+        builder.directory(new File(app.getStaticWorkingDirectory()));
+
+        // log info
+        log.info("Command = " + InputUtils.buildCommand(cmdList));
+        log.info("Working dir = " + builder.directory());
+        for (String key : builder.environment().keySet()) {
+            log.info("Env[" + key + "] = " + builder.environment().get(key));
+        }
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+         ApplicationDeploymentDescriptionType app = jobExecutionContext.
+                 getApplicationContext().getApplicationDeploymentDescription().getType();
+        JobDetails jobDetails = new JobDetails();
+        try {
+        	jobId = jobExecutionContext.getTaskData().getTaskID();
+            jobDetails.setJobID(jobId);
+            jobDetails.setJobDescription(app.toString());
+            jobExecutionContext.setJobDetails(jobDetails);
+            jobDetails.setJobDescription(app.toString());
+            GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
+        	// running cmd
+            Process process = builder.start();
+
+            Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
+            Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
+
+            // start output threads
+            standardOutWriter.setDaemon(true);
+            standardErrorWriter.setDaemon(true);
+            standardOutWriter.start();
+            standardErrorWriter.start();
+
+            int returnValue = process.waitFor();
+
+            // make sure other two threads are done
+            standardOutWriter.join();
+            standardErrorWriter.join();
+
+            /*
+             * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+             * just provide warning in the log messages
+             */
+            if (returnValue != 0) {
+                log.error("Process finished with non zero return value. Process may have failed");
+            } else {
+                log.info("Process finished with return value of zero.");
+            }
+
+            StringBuffer buf = new StringBuffer();
+            buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+                    .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory())
+                    .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ")
+                    .append(String.valueOf(returnValue));
+            log.info(buf.toString());
+        } catch (IOException io) {
+            throw new GFacProviderException(io.getMessage(), io);
+        } catch (InterruptedException e) {
+            throw new GFacProviderException(e.getMessage(), e);
+        }catch (GFacException e) {
+            throw new GFacProviderException(e.getMessage(), e);
+        }
+    }
+
+//	private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+//			throws GFacProviderException {
+//		ApplicationDeploymentDescriptionType app = jobExecutionContext.
+//                getApplicationContext().getApplicationDeploymentDescription().getType();
+//		ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+//		appJob.setJobId(jobId);
+//		LocalProviderJobData data = new LocalProviderJobData();
+//		data.setApplicationName(app.getExecutableLocation());
+//		data.setInputDir(app.getInputDataDirectory());
+//		data.setOutputDir(app.getOutputDataDirectory());
+//		data.setWorkingDir(builder.directory().toString());
+//		data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+//		ByteArrayOutputStream stream = new ByteArrayOutputStream();
+//		JAXB.marshal(data, stream);
+//		appJob.setJobData(stream.toString());
+//		appJob.setSubmittedTime(Calendar.getInstance().getTime());
+//		appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+//		appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+//		GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+//	}
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+        try {
+            String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput());
+            String stdErrStr = GFacUtils.readFileToString(app.getStandardError());
+			Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+        } catch (XmlException e) {
+            throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+        } catch (IOException io) {
+            throw new GFacProviderException(io.getMessage(), io);
+        } catch (Exception e){
+        	throw new GFacProviderException("Error in retrieving results",e);
+        }
+    }
+
+    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
+        throw new NotImplementedException();
+    }
+
+
+    private void buildCommand(String executable, List<String> inputParameterList){
+        cmdList.add(executable);
+        cmdList.addAll(inputParameterList);
+    }
+
+    private void initProcessBuilder(ApplicationDeploymentDescriptionType app){
+        builder = new ProcessBuilder(cmdList);
+
+        NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+        if(env != null && env.length > 0){
+            Map<String,String> builderEnv = builder.environment();
+            for (NameValuePairType entry : env) {
+                builderEnv.put(entry.getName(), entry.getValue());
+            }
+        }
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 902e424..afb101c 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -27,7 +27,7 @@ import org.apache.airavata.gfac.context.ApplicationContext;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
 import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
-import org.apache.airavata.gfac.local.impl.LocalProvider;
+import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
index b9432da..8147853 100644
--- a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
@@ -19,7 +19,7 @@
         </InHandlers>
         <OutHandlers></OutHandlers>
     </GlobalHandlers>
-    <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+    <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
         <InHandlers>
             <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
         </InHandlers>

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
deleted file mode 100644
index ee3dcd2..0000000
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/impl/SSHProvider.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.ssh.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.handler.GFacHandlerException;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gsi.ssh.api.Cluster;
-import org.apache.airavata.gsi.ssh.api.CommandExecutor;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.RawCommandInfo;
-import org.apache.airavata.gsi.ssh.impl.StandardOutReader;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-/**
- * Execute application using remote SSH
- */
-public class SSHProvider extends AbstractProvider {
-    private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
-    private Cluster cluster;
-    private String jobID = null;
-    private String taskID = null;
-    // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh
-    private boolean hpcType = false;
-
-    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-    	super.initialize(jobExecutionContext);
-    	 if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){
-            try {
-                GFACSSHUtils.addSecurityContext(jobExecutionContext);
-            } catch (ApplicationSettingsException e) {
-                log.error(e.getMessage());
-                throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
-            }
-        }
-        taskID = jobExecutionContext.getTaskData().getTaskID();
-		if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
-            jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
-            cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
-            
-            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-            String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
-            details.setJobID(taskID);
-            details.setJobDescription(remoteFile);
-            jobExecutionContext.setJobDetails(details);
-            JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null);
-            details.setJobDescription(jobDescriptor.toXML());
-         
-            GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
-            log.info(remoteFile);
-            try {
-            	File runscript = createShellScript(jobExecutionContext);
-                cluster.scpTo(remoteFile, runscript.getAbsolutePath());
-            } catch (Exception e) {
-                throw new GFacProviderException(e.getLocalizedMessage(), e);
-            }
-        }else{
-           hpcType = true;
-        }
-    }
-
-
-    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        if (!hpcType) {
-            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-            try {
-                /*
-                 * Execute
-                 */
-                String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
-                details.setJobDescription(execuable);
-
-//                GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
-                RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable);
-
-                StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
-                
-                CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
-                String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
-
-                log.info("stdout=" + stdOutputString);
-             
-//                GFacUtils.updateJobStatus(details, JobState.COMPLETE);
-            } catch (Exception e) {
-                throw new GFacProviderException(e.getMessage(), e);
-            } finally {
-                if (cluster != null) {
-                	try {
-						cluster.disconnect();
-					} catch (SSHApiException e) {
-						throw new GFacProviderException(e.getMessage(), e);
-					}
-                }
-            }
-        } else {
-            try {
-                jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-                HostDescriptionType host = jobExecutionContext.getApplicationContext().
-                        getHostDescription().getType();
-                HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
-                        getApplicationDeploymentDescription().getType();
-                JobDetails jobDetails = new JobDetails();
-                String taskID = jobExecutionContext.getTaskData().getTaskID();
-                try {
-                    Cluster cluster = null;
-                    if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) {
-                        cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
-                    }
-                    if (cluster == null) {
-                        throw new GFacProviderException("Security context is not set properly");
-                    } else {
-                        log.info("Successfully retrieved the Security Context");
-                    }
-                    // This installed path is a mandetory field, because this could change based on the computing resource
-                    JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
-
-                    log.info(jobDescriptor.toXML());
-
-                    jobDetails.setJobDescription(jobDescriptor.toXML());
-
-                    String jobID = cluster.submitBatchJob(jobDescriptor);
-                    jobExecutionContext.setJobDetails(jobDetails);
-                    if (jobID == null) {
-                        jobDetails.setJobID("none");
-                        GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
-                    } else {
-                        jobDetails.setJobID(jobID);
-                        GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
-                    }
-
-                } catch (SSHApiException e) {
-                    String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
-                    log.error(error);
-                    jobDetails.setJobID("none");
-                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
-                    GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                    throw new GFacProviderException(error, e);
-                } catch (Exception e) {
-                    String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
-                    log.error(error);
-                    jobDetails.setJobID("none");
-                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
-                    GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                    throw new GFacProviderException(error, e);
-                }
-            } catch (GFacException e) {
-                throw new GFacProviderException(e.getMessage(), e);
-            }
-        }
-    }
-
-    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-
-    }
-
-
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
-        throw new NotImplementedException();
-    }
-
-
-    private File createShellScript(JobExecutionContext context) throws IOException {
-        ApplicationDeploymentDescriptionType app = context.getApplicationContext()
-                .getApplicationDeploymentDescription().getType();
-        String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
-                + new Random().nextLong();
-
-        File shellScript = File.createTempFile(uniqueDir, "sh");
-        OutputStream out = new FileOutputStream(shellScript);
-
-        out.write("#!/bin/bash\n".getBytes());
-        out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
-        out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
-        out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
-                .getBytes());
-        // get the env of the host and the application
-        NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
-        Map<String, String> nv = new HashMap<String, String>();
-        if (env != null) {
-            for (int i = 0; i < env.length; i++) {
-                String key = env[i].getName();
-                String value = env[i].getValue();
-                nv.put(key, value);
-            }
-        }
-        for (Entry<String, String> entry : nv.entrySet()) {
-            log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
-            out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
-        }
-
-        // prepare the command
-        final String SPACE = " ";
-        StringBuffer cmd = new StringBuffer();
-        cmd.append(app.getExecutableLocation());
-        cmd.append(SPACE);
-
-        MessageContext input = context.getInMessageContext();
-        ;
-        Map<String, Object> inputs = input.getParameters();
-        Set<String> keys = inputs.keySet();
-        for (String paramName : keys) {
-            ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
-            if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-                String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
-                for (String value : values) {
-                    cmd.append(value);
-                    cmd.append(SPACE);
-                }
-            } else {
-                String paramValue = MappingFactory.toString(actualParameter);
-                cmd.append(paramValue);
-                cmd.append(SPACE);
-            }
-        }
-        // We redirect the error and stdout to remote files, they will be read
-        // in later
-        cmd.append(SPACE);
-        cmd.append("1>");
-        cmd.append(SPACE);
-        cmd.append(app.getStandardOutput());
-        cmd.append(SPACE);
-        cmd.append("2>");
-        cmd.append(SPACE);
-        cmd.append(app.getStandardError());
-
-        String cmdStr = cmd.toString();
-        log.info("Command = " + cmdStr);
-        out.write((cmdStr + "\n").getBytes());
-        String message = "\"execuationSuceeded\"";
-        out.write(("echo " + message + "\n").getBytes());
-        out.close();
-
-        return shellScript;
-    }
-
-    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
-    }
-    /**
-     * This method will read standard output and if there's any it will be parsed
-     * @param jobIDReaderCommandOutput
-     * @param errorMsg
-     * @return
-     * @throws SSHApiException
-     */
-    private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException {
-        String stdOutputString = jobIDReaderCommandOutput.getStdOutputString();
-        String stdErrorString = jobIDReaderCommandOutput.getStdErrorString();
-
-        if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){
-            log.error("Standard Error output : " + stdErrorString);
-            throw new SSHApiException(errorMsg + stdErrorString);
-        }
-        return stdOutputString;
-    }
-
-
-}


[3/3] git commit: fixing more packaing issue

Posted by la...@apache.org.
fixing more packaing issue


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6209ee09
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6209ee09
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6209ee09

Branch: refs/heads/master
Commit: 6209ee096e2dea80863cabaaad6acce6f2910ee9
Parents: 49b6987
Author: lahiru <la...@apache.org>
Authored: Mon May 5 11:06:59 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon May 5 11:06:59 2014 -0400

----------------------------------------------------------------------
 .../server/src/main/resources/gfac-config.xml   |   2 +-
 .../airavata/gfac/bes/impl/BESProvider.java     | 568 -------------------
 .../gfac/bes/provider/impl/BESProvider.java     | 568 +++++++++++++++++++
 .../airavata/gfac/gram/impl/GramProvider.java   | 527 -----------------
 .../gfac/gram/provider/impl/GramProvider.java   | 527 +++++++++++++++++
 .../gfac/gsissh/impl/GSISSHProvider.java        | 138 -----
 .../gsissh/provider/impl/GSISSHProvider.java    | 138 +++++
 .../airavata/gfac/local/impl/LocalProvider.java | 239 --------
 .../gfac/local/provider/impl/LocalProvider.java | 239 ++++++++
 .../gfac/services/impl/LocalProviderTest.java   |   2 +-
 .../src/test/resources/gfac-config.xml          |   2 +-
 .../airavata/gfac/ssh/impl/SSHProvider.java     | 310 ----------
 .../gfac/ssh/provider/impl/SSHProvider.java     | 310 ++++++++++
 .../src/test/resources/gfac-config.xml          |   2 +-
 .../src/main/resources/gfac-config.xml          |   2 +-
 .../src/test/resources/gfac-config.xml          |   2 +-
 16 files changed, 1788 insertions(+), 1788 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 0e95bc4..4cddfda 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -24,7 +24,7 @@
         </InHandlers>
         <OutHandlers></OutHandlers>
     </GlobalHandlers>
-    <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+    <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
         <InHandlers>
             <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/>
         </InHandlers>

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
deleted file mode 100644
index c41632f..0000000
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/impl/BESProvider.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.bes.impl;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigInteger;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-
-import javax.security.auth.x500.X500Principal;
-
-import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.bes.security.GSISecurityContext;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
-import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.bes.utils.DataTransferrer;
-import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
-import org.apache.airavata.gfac.bes.utils.StorageCreator;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
-import org.apache.airavata.schemas.gfac.UnicoreHostType;
-import org.apache.xmlbeans.XmlCursor;
-import org.bouncycastle.asn1.ASN1InputStream;
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x500.style.BCStyle;
-import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration.Enum;
-import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
-import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesDocument;
-import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesResponseDocument;
-import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
-import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3.x2005.x08.addressing.EndpointReferenceType;
-
-import de.fzj.unicore.bes.client.FactoryClient;
-import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
-import de.fzj.unicore.uas.client.StorageClient;
-import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
-import eu.emi.security.authn.x509.helpers.CertificateHelpers;
-import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
-import eu.emi.security.authn.x509.impl.CertificateUtils;
-import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
-import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
-import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
-import eu.emi.security.authn.x509.impl.X500NameUtils;
-import eu.unicore.util.httpclient.DefaultClientConfiguration;
-
-
-
-public class BESProvider extends AbstractProvider {
-    protected final Logger log = LoggerFactory.getLogger(this.getClass());
-
-    private DefaultClientConfiguration secProperties;
-
-    private String jobId;
-    
-    
-        
-	public void initialize(JobExecutionContext jobExecutionContext)
-			throws GFacProviderException, GFacException {
-		log.info("Initializing UNICORE Provider");
-		super.initialize(jobExecutionContext);
-    	initSecurityProperties(jobExecutionContext);
-    	log.debug("initialized security properties");
-    }
-
-
-	public void execute(JobExecutionContext jobExecutionContext)
-			throws GFacProviderException {
-        UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
-                .getType();
-
-        String factoryUrl = host.getUnicoreBESEndPointArray()[0];
-
-        EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
-        eprt.addNewAddress().setStringValue(factoryUrl);
-
-        String userDN = getUserName(jobExecutionContext);
-
-        if (userDN == null || userDN.equalsIgnoreCase("admin")) {
-            userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
-        }
-
-        String xlogin = getCNFromUserDN(userDN);
-        // create storage
-        StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, xlogin);
-
-        StorageClient sc = null;
-        try {
-            try {
-                sc = storageCreator.createStorage();
-            } catch (Exception e2) {
-                log.error("Cannot create storage..");
-                throw new GFacProviderException("Cannot create storage..", e2);
-            }
-
-            CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
-            JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory.newInstance();
-
-            JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
-            try {
-                jobDefinition = JSDLGenerator.buildJSDLInstance(jobExecutionContext, sc.getUrl()).getJobDefinition();
-                cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
-
-                log.info("JSDL" + jobDefDoc.toString());
-            } catch (Exception e1) {
-                throw new GFacProviderException("Cannot generate JSDL instance from the JobExecutionContext.", e1);
-            }
-
-            // upload files if any
-            DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
-            dt.uploadLocalFiles();
-
-            FactoryClient factory = null;
-            try {
-                factory = new FactoryClient(eprt, secProperties);
-            } catch (Exception e) {
-                throw new GFacProviderException(e.getLocalizedMessage(), e);
-            }
-
-            CreateActivityResponseDocument response = null;
-            try {
-                log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
-                response = factory.createActivity(cad);
-                log.info(String.format("Activity Submitted to %s \n", factoryUrl));
-            } catch (Exception e) {
-                throw new GFacProviderException("Cannot create activity.", e);
-            }
-            EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
-
-            log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
-
-            // factory.waitWhileActivityIsDone(activityEpr, 1000);
-            jobId = WSUtilities.extractResourceID(activityEpr);
-            if (jobId == null) {
-                jobId = new Long(Calendar.getInstance().getTimeInMillis()).toString();
-            }
-            log.info("JobID: " + jobId);
-            jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
-            saveApplicationJob(jobExecutionContext, jobDefinition, activityEpr.toString());
-
-            factory.getActivityStatus(activityEpr);
-            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(),
-                    factory.getActivityStatus(activityEpr).toString()));
-
-            // TODO publish the status messages to the message bus
-            while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
-                    && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
-                    && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
-
-                ActivityStatusType activityStatus = null;
-                try {
-                    activityStatus = getStatus(factory, activityEpr);
-                    JobState jobStatus = getApplicationJobStatus(activityStatus);
-                    String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
-                    jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
-                    details.setJobID(jobId);
-                    GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
-                } catch (UnknownActivityIdentifierFault e) {
-                    throw new GFacProviderException(e.getMessage(), e.getCause());
-                }catch (GFacException e) {
-                    throw new GFacProviderException(e.getMessage(), e.getCause());
-                }
-
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                }
-                continue;
-            }
-
-            ActivityStatusType activityStatus = null;
-            try {
-                activityStatus = getStatus(factory, activityEpr);
-            } catch (UnknownActivityIdentifierFault e) {
-                throw new GFacProviderException(e.getMessage(), e.getCause());
-            }
-
-            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState()
-                    .toString()));
-
-            if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
-                String error = activityStatus.getFault().getFaultcode().getLocalPart() + "\n"
-                        + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode();
-                log.info(error);
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                }
-                dt.downloadStdOuts();
-            } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
-                String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
-                JobState jobStatus = JobState.CANCELED;
-                String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
-                jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
-                details.setJobID(jobId);
-                try {
-					GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
-				} catch (GFacException e) {
-					 throw new GFacProviderException(e.getLocalizedMessage(),e);
-				}
-                throw new GFacProviderException(experimentID + "Job Canceled");
-            }
-
-            else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                }
-                if (activityStatus.getExitCode() == 0) {
-                    dt.downloadRemoteFiles();
-                } else {
-                    dt.downloadStdOuts();
-                }
-            }
-
-        } catch (UnknownActivityIdentifierFault e1) {
-            throw new GFacProviderException(e1.getLocalizedMessage(), e1);
-        } finally {
-            // destroy sms instance
-            try {
-                if (sc != null) {
-                    sc.destroy();
-                }
-            } catch (Exception e) {
-                log.warn("Cannot destroy temporary SMS instance:" + sc.getUrl(), e);
-            }
-        }
-    }
-
-	private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
-        if (activityStatus == null) {
-            return JobState.UNKNOWN;
-        }
-        Enum state = activityStatus.getState();
-        String status = null;
-        XmlCursor acursor = activityStatus.newCursor();
-        try {
-            if (acursor.toFirstChild()) {
-                if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
-                    status = acursor.getName().getLocalPart();
-                }
-            }
-            if (status != null) {
-                if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
-                        || status.equalsIgnoreCase("Ready")) {
-                    return JobState.QUEUED;
-                } else if (status.equalsIgnoreCase("Staging-In")) {
-                    return JobState.SUBMITTED;
-                } else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
-                    return JobState.COMPLETE;
-                } else if (status.equalsIgnoreCase("Executing")) {
-                    return JobState.ACTIVE;
-                } else if (status.equalsIgnoreCase("FAILED")) {
-                    return JobState.FAILED;
-                } else if (status.equalsIgnoreCase("CANCELLED")) {
-                    return JobState.CANCELED;
-                }
-            } else {
-                if (ActivityStateEnumeration.CANCELLED.equals(state)) {
-                    return JobState.CANCELED;
-                } else if (ActivityStateEnumeration.FAILED.equals(state)) {
-                    return JobState.FAILED;
-                } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
-                    return JobState.COMPLETE;
-                } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
-                    return JobState.ACTIVE;
-                }
-            }
-        } finally {
-            if (acursor != null)
-                acursor.dispose();
-        }
-        return JobState.UNKNOWN;
-    }
-
-    private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
-                                    String metadata) {
-        ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
-        appJob.setJobId(jobId);
-        appJob.setJobData(jobDefinition.toString());
-        appJob.setSubmittedTime(Calendar.getInstance().getTime());
-        appJob.setStatus(ApplicationJobStatus.SUBMITTED);
-        appJob.setStatusUpdateTime(appJob.getSubmittedTime());
-        appJob.setMetadata(metadata);
-        GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
-    }
-
-    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        secProperties = null;
-    }
-
-    /**
-     * EndpointReference need to be saved to make cancel work.
-     *
-     * @param activityEpr
-     * @param jobExecutionContext
-     * @throws GFacProviderException
-     */
-    public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        try {
-            initSecurityProperties(jobExecutionContext);
-            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
-            UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
-                    .getType();
-
-            String factoryUrl = host.getUnicoreBESEndPointArray()[0];
-            EndpointReferenceType epr = EndpointReferenceType.Factory.newInstance();
-            epr.addNewAddress().setStringValue(factoryUrl);
-
-            FactoryClient factory = new FactoryClient(epr, secProperties);
-            factory.terminateActivity(eprt);
-        } catch (Exception e) {
-            throw new GFacProviderException(e.getLocalizedMessage(),e);
-        }
-
-    }
-
-    protected void downloadOffline(String smsEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        try {
-            initSecurityProperties(jobExecutionContext);
-            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(smsEpr);
-            StorageClient sms = new StorageClient(eprt, secProperties);
-            DataTransferrer dt = new DataTransferrer(jobExecutionContext, sms);
-            // there must be output files there
-            // this is also possible if client is re-connected, the jobs are
-            // still
-            // running and no output is produced
-            dt.downloadRemoteFiles();
-
-            // may be use the below method before downloading for checking
-            // the number of entries
-            // sms.listDirectory(".");
-
-        } catch (Exception e) {
-            throw new GFacProviderException(e.getLocalizedMessage(), e);
-        }
-    }
-
-    protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacProviderException,
-            GFacException {
-
-        if (secProperties != null)
-            return;
-
-        GSISecurityContext gssContext = (GSISecurityContext) jobExecutionContext
-                .getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT);
-
-        try {
-            String certLocation = gssContext.getTrustedCertificatePath();
-            List<String> trustedCert = new ArrayList<String>();
-            trustedCert.add(certLocation + "/*.0");
-            trustedCert.add(certLocation + "/*.pem");
-
-            DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1,
-                    60000, null);
-
-            String userID = getUserName(jobExecutionContext);
-
-            if ( userID == null || "".equals(userID) || userID.equalsIgnoreCase("admin") ) {
-                userID = "CN=zdv575, O=Ultrascan Gateway, C=DE";
-            }
-
-            String userDN = userID.replaceAll("^\"|\"$", "");
-
-            // TODO: should be changed to default airavata server locations
-            KeyAndCertCredential cred = generateShortLivedCertificate(userDN, certLocation
-                    + "/cacert.pem", certLocation
-                    + "/cakey.pem", "ultrascan3");
-            secProperties = new DefaultClientConfiguration(dcValidator, cred);
-
-            // secProperties.doSSLAuthn();
-            secProperties.getETDSettings().setExtendTrustDelegation(true);
-
-            secProperties.setDoSignMessage(true);
-
-            String[] outHandlers = secProperties.getOutHandlerClassNames();
-
-            Set<String> outHandlerLst = null;
-
-            // timeout in milliseconds
-            Properties p = secProperties.getExtraSettings();
-            p.setProperty("http.connection.timeout", "300000");
-            p.setProperty("http.socket.timeout", "300000");
-
-            if (outHandlers == null) {
-                outHandlerLst = new HashSet<String>();
-            } else {
-                outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
-            }
-
-            outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
-
-            secProperties.setOutHandlerClassNames(outHandlerLst.toArray(new String[outHandlerLst.size()]));
-
-        } catch (Exception e) {
-            throw new GFacProviderException(e.getMessage(), e);
-        }
-    }
-
-    //FIXME: Get user details
-    private String getUserName(JobExecutionContext context) {
-//        if (context.getConfigurationData()!= null) {
-//            return context.getConfigurationData().getBasicMetadata().getUserName();
-//        } else {
-           return "";
-//        }
-    }
-
-    protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
-            throws UnknownActivityIdentifierFault {
-
-        GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory.newInstance();
-
-        stats.addNewGetActivityStatuses().setActivityIdentifierArray(new EndpointReferenceType[] { activityEpr });
-
-        GetActivityStatusesResponseDocument resDoc = fc.getActivityStatuses(stats);
-
-        ActivityStatusType activityStatus = resDoc.getGetActivityStatusesResponse().getResponseArray()[0]
-                .getActivityStatus();
-        return activityStatus;
-    }
-
-    protected String formatStatusMessage(String activityUrl, String status) {
-        return String.format("Activity %s is %s.\n", activityUrl, status);
-    }
-
-    protected String subStatusAsString(ActivityStatusType statusType) {
-
-        StringBuffer sb = new StringBuffer();
-
-        sb.append(statusType.getState().toString());
-
-        XmlCursor acursor = statusType.newCursor();
-        if (acursor.toFirstChild()) {
-            do {
-                if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
-                    sb.append(":");
-                    sb.append(acursor.getName().getLocalPart());
-                }
-            } while (acursor.toNextSibling());
-            acursor.dispose();
-            return sb.toString();
-        } else {
-            acursor.dispose();
-            return sb.toString();
-        }
-
-    }
-
-    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
-
-    }
-
-    protected KeyAndCertCredential generateShortLivedCertificate(String userDN, String caCertPath, String caKeyPath,
-                                                                 String caPwd) throws Exception {
-        final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
-        // ago
-
-        final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
-        final long endTime = startTime + 30 * 3600 * 1000;
-
-        String keyLengthProp = "1024";
-        int keyLength = Integer.parseInt(keyLengthProp);
-        String signatureAlgorithm = "SHA1withRSA";
-
-        KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
-
-        KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
-        kpg.initialize(keyLength);
-        KeyPair pair = kpg.generateKeyPair();
-
-        X500Principal subjectDN = new X500Principal(userDN);
-        Random rand = new Random();
-
-        SubjectPublicKeyInfo publicKeyInfo;
-        try {
-            publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
-                    .readObject());
-        } catch (IOException e) {
-            throw new InvalidKeyException("Can not parse the public key"
-                    + "being included in the short lived certificate", e);
-        }
-
-        X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
-
-        X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
-
-        X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
-                new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
-
-        AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
-
-        X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
-
-        certificate.checkValidity(new Date());
-        certificate.verify(caCred.getCertificate().getPublicKey());
-        KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
-                caCred.getCertificate() });
-
-        return result;
-    }
-
-    private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
-        InputStream isKey = new FileInputStream(caKeyPath);
-        PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
-
-        InputStream isCert = new FileInputStream(caCertPath);
-        X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
-
-        if (isKey != null)
-            isKey.close();
-        if (isCert != null)
-            isCert.close();
-
-        return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
-    }
-
-    private String getCNFromUserDN(String userDN) {
-        return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
new file mode 100644
index 0000000..cebb7f8
--- /dev/null
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -0,0 +1,568 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.bes.provider.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.security.InvalidKeyException;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import javax.security.auth.x500.X500Principal;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.bes.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
+import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.bes.utils.DataTransferrer;
+import org.apache.airavata.gfac.bes.utils.JSDLGenerator;
+import org.apache.airavata.gfac.bes.utils.StorageCreator;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
+import org.apache.xmlbeans.XmlCursor;
+import org.bouncycastle.asn1.ASN1InputStream;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration.Enum;
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesDocument;
+import org.ggf.schemas.bes.x2006.x08.besFactory.GetActivityStatusesResponseDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.emi.security.authn.x509.helpers.CertificateHelpers;
+import eu.emi.security.authn.x509.helpers.proxy.X509v3CertificateBuilder;
+import eu.emi.security.authn.x509.impl.CertificateUtils;
+import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
+import eu.emi.security.authn.x509.impl.DirectoryCertChainValidator;
+import eu.emi.security.authn.x509.impl.KeyAndCertCredential;
+import eu.emi.security.authn.x509.impl.X500NameUtils;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+
+
+
+public class BESProvider extends AbstractProvider {
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private DefaultClientConfiguration secProperties;
+
+    private String jobId;
+    
+    
+        
+	public void initialize(JobExecutionContext jobExecutionContext)
+			throws GFacProviderException, GFacException {
+		log.info("Initializing UNICORE Provider");
+		super.initialize(jobExecutionContext);
+    	initSecurityProperties(jobExecutionContext);
+    	log.debug("initialized security properties");
+    }
+
+
+	public void execute(JobExecutionContext jobExecutionContext)
+			throws GFacProviderException {
+        UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+                .getType();
+
+        String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+
+        EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+        eprt.addNewAddress().setStringValue(factoryUrl);
+
+        String userDN = getUserName(jobExecutionContext);
+
+        if (userDN == null || userDN.equalsIgnoreCase("admin")) {
+            userDN = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+        }
+
+        String xlogin = getCNFromUserDN(userDN);
+        // create storage
+        StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl, 5, xlogin);
+
+        StorageClient sc = null;
+        try {
+            try {
+                sc = storageCreator.createStorage();
+            } catch (Exception e2) {
+                log.error("Cannot create storage..");
+                throw new GFacProviderException("Cannot create storage..", e2);
+            }
+
+            CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+            JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory.newInstance();
+
+            JobDefinitionType jobDefinition = jobDefDoc.addNewJobDefinition();
+            try {
+                jobDefinition = JSDLGenerator.buildJSDLInstance(jobExecutionContext, sc.getUrl()).getJobDefinition();
+                cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+                log.info("JSDL" + jobDefDoc.toString());
+            } catch (Exception e1) {
+                throw new GFacProviderException("Cannot generate JSDL instance from the JobExecutionContext.", e1);
+            }
+
+            // upload files if any
+            DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
+            dt.uploadLocalFiles();
+
+            FactoryClient factory = null;
+            try {
+                factory = new FactoryClient(eprt, secProperties);
+            } catch (Exception e) {
+                throw new GFacProviderException(e.getLocalizedMessage(), e);
+            }
+
+            CreateActivityResponseDocument response = null;
+            try {
+                log.info(String.format("Activity Submitting to %s ... \n", factoryUrl));
+                response = factory.createActivity(cad);
+                log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+            } catch (Exception e) {
+                throw new GFacProviderException("Cannot create activity.", e);
+            }
+            EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+            log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+            // factory.waitWhileActivityIsDone(activityEpr, 1000);
+            jobId = WSUtilities.extractResourceID(activityEpr);
+            if (jobId == null) {
+                jobId = new Long(Calendar.getInstance().getTimeInMillis()).toString();
+            }
+            log.info("JobID: " + jobId);
+            jobExecutionContext.getNotifier().publish(new UnicoreJobIDEvent(jobId));
+            saveApplicationJob(jobExecutionContext, jobDefinition, activityEpr.toString());
+
+            factory.getActivityStatus(activityEpr);
+            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(),
+                    factory.getActivityStatus(activityEpr).toString()));
+
+            // TODO publish the status messages to the message bus
+            while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+                    && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+                    && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) {
+
+                ActivityStatusType activityStatus = null;
+                try {
+                    activityStatus = getStatus(factory, activityEpr);
+                    JobState jobStatus = getApplicationJobStatus(activityStatus);
+                    String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+                    jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+                    details.setJobID(jobId);
+                    GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
+                } catch (UnknownActivityIdentifierFault e) {
+                    throw new GFacProviderException(e.getMessage(), e.getCause());
+                }catch (GFacException e) {
+                    throw new GFacProviderException(e.getMessage(), e.getCause());
+                }
+
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                }
+                continue;
+            }
+
+            ActivityStatusType activityStatus = null;
+            try {
+                activityStatus = getStatus(factory, activityEpr);
+            } catch (UnknownActivityIdentifierFault e) {
+                throw new GFacProviderException(e.getMessage(), e.getCause());
+            }
+
+            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState()
+                    .toString()));
+
+            if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+                String error = activityStatus.getFault().getFaultcode().getLocalPart() + "\n"
+                        + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode();
+                log.info(error);
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                }
+                dt.downloadStdOuts();
+            } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+                String experimentID = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+                JobState jobStatus = JobState.CANCELED;
+                String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
+                jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
+                details.setJobID(jobId);
+                try {
+					GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
+				} catch (GFacException e) {
+					 throw new GFacProviderException(e.getLocalizedMessage(),e);
+				}
+                throw new GFacProviderException(experimentID + "Job Canceled");
+            }
+
+            else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                }
+                if (activityStatus.getExitCode() == 0) {
+                    dt.downloadRemoteFiles();
+                } else {
+                    dt.downloadStdOuts();
+                }
+            }
+
+        } catch (UnknownActivityIdentifierFault e1) {
+            throw new GFacProviderException(e1.getLocalizedMessage(), e1);
+        } finally {
+            // destroy sms instance
+            try {
+                if (sc != null) {
+                    sc.destroy();
+                }
+            } catch (Exception e) {
+                log.warn("Cannot destroy temporary SMS instance:" + sc.getUrl(), e);
+            }
+        }
+    }
+
+	private JobState getApplicationJobStatus(ActivityStatusType activityStatus){
+        if (activityStatus == null) {
+            return JobState.UNKNOWN;
+        }
+        Enum state = activityStatus.getState();
+        String status = null;
+        XmlCursor acursor = activityStatus.newCursor();
+        try {
+            if (acursor.toFirstChild()) {
+                if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+                    status = acursor.getName().getLocalPart();
+                }
+            }
+            if (status != null) {
+                if (status.equalsIgnoreCase("Queued") || status.equalsIgnoreCase("Starting")
+                        || status.equalsIgnoreCase("Ready")) {
+                    return JobState.QUEUED;
+                } else if (status.equalsIgnoreCase("Staging-In")) {
+                    return JobState.SUBMITTED;
+                } else if (status.equalsIgnoreCase("Staging-Out") || status.equalsIgnoreCase("FINISHED")) {
+                    return JobState.COMPLETE;
+                } else if (status.equalsIgnoreCase("Executing")) {
+                    return JobState.ACTIVE;
+                } else if (status.equalsIgnoreCase("FAILED")) {
+                    return JobState.FAILED;
+                } else if (status.equalsIgnoreCase("CANCELLED")) {
+                    return JobState.CANCELED;
+                }
+            } else {
+                if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+                    return JobState.CANCELED;
+                } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+                    return JobState.FAILED;
+                } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+                    return JobState.COMPLETE;
+                } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+                    return JobState.ACTIVE;
+                }
+            }
+        } finally {
+            if (acursor != null)
+                acursor.dispose();
+        }
+        return JobState.UNKNOWN;
+    }
+
+    private void saveApplicationJob(JobExecutionContext jobExecutionContext, JobDefinitionType jobDefinition,
+                                    String metadata) {
+        ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+        appJob.setJobId(jobId);
+        appJob.setJobData(jobDefinition.toString());
+        appJob.setSubmittedTime(Calendar.getInstance().getTime());
+        appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+        appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+        appJob.setMetadata(metadata);
+        GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+    }
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        secProperties = null;
+    }
+
+    /**
+     * EndpointReference need to be saved to make cancel work.
+     *
+     * @param activityEpr
+     * @param jobExecutionContext
+     * @throws GFacProviderException
+     */
+    public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        try {
+            initSecurityProperties(jobExecutionContext);
+            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr);
+            UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription()
+                    .getType();
+
+            String factoryUrl = host.getUnicoreBESEndPointArray()[0];
+            EndpointReferenceType epr = EndpointReferenceType.Factory.newInstance();
+            epr.addNewAddress().setStringValue(factoryUrl);
+
+            FactoryClient factory = new FactoryClient(epr, secProperties);
+            factory.terminateActivity(eprt);
+        } catch (Exception e) {
+            throw new GFacProviderException(e.getLocalizedMessage(),e);
+        }
+
+    }
+
+    protected void downloadOffline(String smsEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        try {
+            initSecurityProperties(jobExecutionContext);
+            EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(smsEpr);
+            StorageClient sms = new StorageClient(eprt, secProperties);
+            DataTransferrer dt = new DataTransferrer(jobExecutionContext, sms);
+            // there must be output files there
+            // this is also possible if client is re-connected, the jobs are
+            // still
+            // running and no output is produced
+            dt.downloadRemoteFiles();
+
+            // may be use the below method before downloading for checking
+            // the number of entries
+            // sms.listDirectory(".");
+
+        } catch (Exception e) {
+            throw new GFacProviderException(e.getLocalizedMessage(), e);
+        }
+    }
+
+    protected void initSecurityProperties(JobExecutionContext jobExecutionContext) throws GFacProviderException,
+            GFacException {
+
+        if (secProperties != null)
+            return;
+
+        GSISecurityContext gssContext = (GSISecurityContext) jobExecutionContext
+                .getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT);
+
+        try {
+            String certLocation = gssContext.getTrustedCertificatePath();
+            List<String> trustedCert = new ArrayList<String>();
+            trustedCert.add(certLocation + "/*.0");
+            trustedCert.add(certLocation + "/*.pem");
+
+            DirectoryCertChainValidator dcValidator = new DirectoryCertChainValidator(trustedCert, Encoding.PEM, -1,
+                    60000, null);
+
+            String userID = getUserName(jobExecutionContext);
+
+            if ( userID == null || "".equals(userID) || userID.equalsIgnoreCase("admin") ) {
+                userID = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+            }
+
+            String userDN = userID.replaceAll("^\"|\"$", "");
+
+            // TODO: should be changed to default airavata server locations
+            KeyAndCertCredential cred = generateShortLivedCertificate(userDN, certLocation
+                    + "/cacert.pem", certLocation
+                    + "/cakey.pem", "ultrascan3");
+            secProperties = new DefaultClientConfiguration(dcValidator, cred);
+
+            // secProperties.doSSLAuthn();
+            secProperties.getETDSettings().setExtendTrustDelegation(true);
+
+            secProperties.setDoSignMessage(true);
+
+            String[] outHandlers = secProperties.getOutHandlerClassNames();
+
+            Set<String> outHandlerLst = null;
+
+            // timeout in milliseconds
+            Properties p = secProperties.getExtraSettings();
+            p.setProperty("http.connection.timeout", "300000");
+            p.setProperty("http.socket.timeout", "300000");
+
+            if (outHandlers == null) {
+                outHandlerLst = new HashSet<String>();
+            } else {
+                outHandlerLst = new HashSet<String>(Arrays.asList(outHandlers));
+            }
+
+            outHandlerLst.add("de.fzj.unicore.uas.security.ProxyCertOutHandler");
+
+            secProperties.setOutHandlerClassNames(outHandlerLst.toArray(new String[outHandlerLst.size()]));
+
+        } catch (Exception e) {
+            throw new GFacProviderException(e.getMessage(), e);
+        }
+    }
+
+    //FIXME: Get user details
+    private String getUserName(JobExecutionContext context) {
+//        if (context.getConfigurationData()!= null) {
+//            return context.getConfigurationData().getBasicMetadata().getUserName();
+//        } else {
+           return "";
+//        }
+    }
+
+    protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+            throws UnknownActivityIdentifierFault {
+
+        GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory.newInstance();
+
+        stats.addNewGetActivityStatuses().setActivityIdentifierArray(new EndpointReferenceType[] { activityEpr });
+
+        GetActivityStatusesResponseDocument resDoc = fc.getActivityStatuses(stats);
+
+        ActivityStatusType activityStatus = resDoc.getGetActivityStatusesResponse().getResponseArray()[0]
+                .getActivityStatus();
+        return activityStatus;
+    }
+
+    protected String formatStatusMessage(String activityUrl, String status) {
+        return String.format("Activity %s is %s.\n", activityUrl, status);
+    }
+
+    protected String subStatusAsString(ActivityStatusType statusType) {
+
+        StringBuffer sb = new StringBuffer();
+
+        sb.append(statusType.getState().toString());
+
+        XmlCursor acursor = statusType.newCursor();
+        if (acursor.toFirstChild()) {
+            do {
+                if (acursor.getName().getNamespaceURI().equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+                    sb.append(":");
+                    sb.append(acursor.getName().getLocalPart());
+                }
+            } while (acursor.toNextSibling());
+            acursor.dispose();
+            return sb.toString();
+        } else {
+            acursor.dispose();
+            return sb.toString();
+        }
+
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+
+    protected KeyAndCertCredential generateShortLivedCertificate(String userDN, String caCertPath, String caKeyPath,
+                                                                 String caPwd) throws Exception {
+        final long CredentialGoodFromOffset = 1000L * 60L * 15L; // 15 minutes
+        // ago
+
+        final long startTime = System.currentTimeMillis() - CredentialGoodFromOffset;
+        final long endTime = startTime + 30 * 3600 * 1000;
+
+        String keyLengthProp = "1024";
+        int keyLength = Integer.parseInt(keyLengthProp);
+        String signatureAlgorithm = "SHA1withRSA";
+
+        KeyAndCertCredential caCred = getCACredential(caCertPath, caKeyPath, caPwd);
+
+        KeyPairGenerator kpg = KeyPairGenerator.getInstance(caCred.getKey().getAlgorithm());
+        kpg.initialize(keyLength);
+        KeyPair pair = kpg.generateKeyPair();
+
+        X500Principal subjectDN = new X500Principal(userDN);
+        Random rand = new Random();
+
+        SubjectPublicKeyInfo publicKeyInfo;
+        try {
+            publicKeyInfo = SubjectPublicKeyInfo.getInstance(new ASN1InputStream(pair.getPublic().getEncoded())
+                    .readObject());
+        } catch (IOException e) {
+            throw new InvalidKeyException("Can not parse the public key"
+                    + "being included in the short lived certificate", e);
+        }
+
+        X500Name issuerX500Name = CertificateHelpers.toX500Name(caCred.getCertificate().getSubjectX500Principal());
+
+        X500Name subjectX500Name = CertificateHelpers.toX500Name(subjectDN);
+
+        X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(issuerX500Name, new BigInteger(20, rand),
+                new Date(startTime), new Date(endTime), subjectX500Name, publicKeyInfo);
+
+        AlgorithmIdentifier sigAlgId = X509v3CertificateBuilder.extractAlgorithmId(caCred.getCertificate());
+
+        X509Certificate certificate = certBuilder.build(caCred.getKey(), sigAlgId, signatureAlgorithm, null, null);
+
+        certificate.checkValidity(new Date());
+        certificate.verify(caCred.getCertificate().getPublicKey());
+        KeyAndCertCredential result = new KeyAndCertCredential(pair.getPrivate(), new X509Certificate[] { certificate,
+                caCred.getCertificate() });
+
+        return result;
+    }
+
+    private KeyAndCertCredential getCACredential(String caCertPath, String caKeyPath, String password) throws Exception {
+        InputStream isKey = new FileInputStream(caKeyPath);
+        PrivateKey pk = CertificateUtils.loadPrivateKey(isKey, Encoding.PEM, password.toCharArray());
+
+        InputStream isCert = new FileInputStream(caCertPath);
+        X509Certificate caCert = CertificateUtils.loadCertificate(isCert, Encoding.PEM);
+
+        if (isKey != null)
+            isKey.close();
+        if (isCert != null)
+            isCert.close();
+
+        return new KeyAndCertCredential(pk, new X509Certificate[] { caCert });
+    }
+
+    private String getCNFromUserDN(String userDN) {
+        return X500NameUtils.getAttributeValues(userDN, BCStyle.CN)[0];
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
deleted file mode 100644
index b675c5e..0000000
--- a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/impl/GramProvider.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.gram.impl;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.util.Map;
-import java.util.MissingResourceException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.JobSubmissionFault;
-import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.gram.security.GSISecurityContext;
-import org.apache.airavata.gfac.gram.util.GramJobSubmissionListener;
-import org.apache.airavata.gfac.gram.util.GramProviderUtils;
-import org.apache.airavata.gfac.notification.events.JobIDEvent;
-import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.AbstractProvider;
-import org.apache.airavata.gfac.provider.GFacProviderException;
-import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
-import org.apache.airavata.schemas.gfac.GlobusHostType;
-import org.globus.gram.GramException;
-import org.globus.gram.GramJob;
-import org.globus.gram.WaitingForCommitException;
-import org.globus.gram.internal.GRAMConstants;
-import org.globus.gram.internal.GRAMProtocolErrorConstants;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GramProvider extends AbstractProvider {
-    private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
-
-    private GramJob job;
-    private GramJobSubmissionListener listener;
-    private boolean twoPhase = true;
-
-    /**
-     * If normal job submission fail due to an authorisation failure or script failure we
-     * will re-attempt to submit the job. In-order to avoid any recursive loop during a continuous
-     * failure we track whether failure paths are tried or not. Following booleans keeps track whether
-     * we already tried a failure path or not.
-     */
-    /**
-     * To track job submissions during a authorisation failure while requesting job.
-     */
-    private boolean renewCredentialsAttempt = false;
-    /**
-     * To track job submission during a script error situation.
-     */
-    private boolean reSubmissionInProgress = false;
-    /**
-     * To track authorisation failures during status monitoring.
-     */
-    private boolean authorisationFailedAttempt = false;
-
-    private static final Map<String, GramJob> currentlyExecutingJobCache
-            = new ConcurrentHashMap<String, GramJob>();
-
-    private static Properties resources;
-
-    static {
-        try {
-
-            String propFileName = "errors.properties";
-            resources = new Properties();
-            InputStream inputStream = GramProvider.class.getClassLoader()
-                    .getResourceAsStream(propFileName);
-
-            if (inputStream == null) {
-                throw new FileNotFoundException("property file '" + propFileName
-                        + "' not found in the classpath");
-            }
-
-            resources.load(inputStream);
-
-        } catch (FileNotFoundException mre) {
-            log.error("errors.properties not found", mre);
-        } catch (IOException e) {
-            log.error("Error reading errors.properties file", e);
-        }
-    }
-
-
-    // This method prepare the environment before the application invocation.
-    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-
-        try {
-        	super.initialize(jobExecutionContext);
-            String strTwoPhase = ServerSettings.getSetting("TwoPhase");
-            if (strTwoPhase != null) {
-                twoPhase = Boolean.parseBoolean(strTwoPhase);
-                log.info("Two phase commit is set to " + twoPhase);
-            }
-        } catch (ApplicationSettingsException e) {
-            log.warn("Error reading TwoPhase property from configurations.", e);
-        }
-
-        job = GramProviderUtils.setupEnvironment(jobExecutionContext, twoPhase);
-        listener = new GramJobSubmissionListener(job, jobExecutionContext);
-        job.addListener(listener);
-    }
-
-    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException{
-        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-        GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().
-                getHostDescription().getType();
-        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().
-                getApplicationDeploymentDescription().getType();
-
-        StringBuilder stringBuilder = new StringBuilder();
-        try {
-
-            GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
-                    getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
-            job.setCredentials(gssCred);
-            // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
-            String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
-            log.info("Request to contact:" + gateKeeper);
-
-            stringBuilder.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
-                    .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
-                    .append(" temp directory = ").append(app.getScratchWorkingDirectory())
-                    .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
-
-            log.info(stringBuilder.toString());
-
-            submitJobs(gateKeeper, jobExecutionContext, host);
-
-        } catch (ApplicationSettingsException e) {
-        	throw new GFacException(e.getMessage(), e);
-		} finally {
-            if (job != null) {
-                try {
-                	 /*
-                     * Remove listener
-                     */
-                    job.removeListener(listener);
-                } catch (Exception e) {
-                	 log.error(e.getMessage());
-                }
-            }
-        }
-    }
-
-    private void submitJobs(String gateKeeper,
-                            JobExecutionContext jobExecutionContext,
-                            GlobusHostType globusHostType) throws GFacException, GFacProviderException {
-    	boolean applicationSaved=false;
-    	String taskID = jobExecutionContext.getTaskData().getTaskID();
-			
-    	if (twoPhase) {
-            try {
-                /*
-                * The first boolean is to force communication through SSLv3
-                * The second boolean is to specify the job is a batch job - use true for interactive and false for
-                 * batch.
-                * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
-                */
-                job.request(true, gateKeeper, false, false);
-
-                // Single boolean to track all authentication failures, therefore we need to re-initialize
-                // this here
-                renewCredentialsAttempt = false;
-
-            } catch (WaitingForCommitException e) {
-            	String jobID = job.getIDAsString();
-				
-            	details.setJobID(jobID);
-            	details.setJobDescription(job.getRSL());
-                jobExecutionContext.setJobDetails(details);
-                GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.UN_SUBMITTED);
-                
-                applicationSaved=true;
-                String jobStatusMessage = "Un-submitted JobID= " + jobID;
-                log.info(jobStatusMessage);
-                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
-                log.info("Two phase commit: sending COMMIT_REQUEST signal; Job id - " + jobID);
-
-                try {
-                    job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
-
-                } catch (GramException gramException) {
-                    throw new GFacException("Error while sending commit request. Job Id - "
-                            + job.getIDAsString(), gramException);
-                } catch (GSSException gssException) {
-
-                    // User credentials are invalid
-                    log.error("Error while submitting commit request - Credentials provided are invalid. Job Id - "
-                            + job.getIDAsString(), e);
-                    log.info("Attempting to renew credentials and re-submit commit signal...");
-                	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                    renewCredentials(jobExecutionContext);
-
-                    try {
-                        job.signal(GramJob.SIGNAL_COMMIT_REQUEST);
-                    } catch (GramException e1) {
-                     	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                    	throw new GFacException("Error while sending commit request. Job Id - "
-                                + job.getIDAsString(), e1);
-                    } catch (GSSException e1) {
-                     	GFacUtils.saveErrorDetails(jobExecutionContext, gssException.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                        throw new GFacException("Error while sending commit request. Job Id - "
-                                + job.getIDAsString() + ". Credentials provided invalid", e1);
-                    }
-                }
-                GFacUtils.updateJobStatus(jobExecutionContext, details, JobState.SUBMITTED);
-                jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
-                log.info(jobStatusMessage);
-                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
-            } catch (GSSException e) {
-                // Renew credentials and re-submit
-             	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                
-                reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
-
-            } catch (GramException e) {
-             	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                
-            	throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
-            }
-        } else {
-
-            /*
-            * The first boolean is to force communication through SSLv3
-            * The second boolean is to specify the job is a batch job - use true for interactive and false for
-             * batch.
-            * The third boolean is to specify to use the full proxy and not delegate a limited proxy.
-            */
-            try {
-
-                job.request(true, gateKeeper, false, false);
-                renewCredentialsAttempt = false;
-
-            } catch (GramException e) {
-            	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                throw new GFacException("An error occurred while submitting a job, job id = " + job.getIDAsString(), e);
-            } catch (GSSException e) {
-            	GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.RETRY_SUBMISSION, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                // Renew credentials and re-submit
-                reSubmitJob(gateKeeper, jobExecutionContext, globusHostType, e);
-            }
-
-            String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
-            log.info(jobStatusMessage);
-            jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
-
-        }
-
-        currentlyExecutingJobCache.put(job.getIDAsString(), job);
-        /*
-        * Wait until job is done
-        */
-        listener.waitFor();
-
-        checkJobStatus(jobExecutionContext, globusHostType, gateKeeper);
-
-    }
-
-    private void renewCredentials(JobExecutionContext jobExecutionContext) throws GFacException {
-
-        renewCredentials(this.job, jobExecutionContext);
-    }
-
-    private void renewCredentials(GramJob gramJob, JobExecutionContext jobExecutionContext) throws GFacException {
-
-        try {
-        	GSSCredential gssCred = ((GSISecurityContext)jobExecutionContext.
-                    getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).renewCredentials();
-            gramJob.renew(gssCred);
-        } catch (GramException e1) {
-            throw new GFacException("Unable to renew credentials. Job Id - "
-                    + gramJob.getIDAsString(), e1);
-        } catch (GSSException e1) {
-            throw new GFacException("Unable to renew credentials. Job Id - "
-                    + gramJob.getIDAsString(), e1);
-        } catch (ApplicationSettingsException e) {
-        	throw new GFacException(e.getLocalizedMessage(), e);
-		}
-    }
-
-    private void reSubmitJob(String gateKeeper,
-                             JobExecutionContext jobExecutionContext,
-                             GlobusHostType globusHostType, Exception e) throws GFacException, GFacProviderException {
-
-        if (!renewCredentialsAttempt) {
-
-            renewCredentialsAttempt = true;
-
-            // User credentials are invalid
-            log.error("Error while submitting job - Credentials provided are invalid. Job Id - "
-                    + job.getIDAsString(), e);
-            log.info("Attempting to renew credentials and re-submit jobs...");
-
-            // Remove existing listener and register a new listener
-            job.removeListener(listener);
-            listener = new GramJobSubmissionListener(job, jobExecutionContext);
-
-            job.addListener(listener);
-
-            renewCredentials(jobExecutionContext);
-
-            submitJobs(gateKeeper, jobExecutionContext, globusHostType);
-
-        } else {
-            throw new GFacException("Error while submitting job - Credentials provided are invalid. Job Id - "
-                    + job.getIDAsString(), e);
-        }
-
-    }
-
-    private void reSubmitJob(String gateKeeper,
-                             JobExecutionContext jobExecutionContext,
-                             GlobusHostType globusHostType) throws GFacException, GFacProviderException {
-
-        // User credentials are invalid
-        log.info("Attempting to renew credentials and re-submit jobs...");
-
-        // Remove existing listener and register a new listener
-        job.removeListener(listener);
-        listener = new GramJobSubmissionListener(job, jobExecutionContext);
-
-        job.addListener(listener);
-
-        renewCredentials(jobExecutionContext);
-
-        submitJobs(gateKeeper, jobExecutionContext, globusHostType);
-
-    }
-
-	
-	
-    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-    }
-
-    public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
-        cancelSingleJob(jobId, jobExecutionContext);
-    }
-
-
-    private void cancelSingleJob(String jobId, JobExecutionContext context) throws GFacException {
-        // First check whether job id is in the cache
-        if (currentlyExecutingJobCache.containsKey(jobId)) {
-
-            synchronized (this) {
-                GramJob gramJob = currentlyExecutingJobCache.get(jobId);
-
-                // Even though we check using containsKey, at this point job could be null
-                if (gramJob != null && (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
-                        gramJob.getStatus() != GRAMConstants.STATUS_FAILED)) {
-                    cancelJob(gramJob, context);
-                }
-            }
-
-        } else {
-
-            try {
-				GSSCredential gssCred = ((GSISecurityContext)context.
-				        getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getGssCredentials();
-
-				GramJob gramJob = new GramJob(null);
-				try {
-				    gramJob.setID(jobId);
-				} catch (MalformedURLException e) {
-				    throw new GFacException("Invalid job id - " + jobId, e);
-				}
-				gramJob.setCredentials(gssCred);
-
-				synchronized (this) {
-				    if (gramJob.getStatus() != GRAMConstants.STATUS_DONE ||
-				            gramJob.getStatus() != GRAMConstants.STATUS_FAILED) {
-				        cancelJob(gramJob, context);
-				    }
-				}
-			} catch (ApplicationSettingsException e) {
-				throw new GFacException(e);
-			}
-        }
-    }
-
-    private void cancelJob(GramJob gramJob, JobExecutionContext context) throws GFacException{
-
-        try {
-            gramJob.cancel();
-        } catch (GramException e) {
-            throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e);
-        } catch (GSSException e) {
-
-            log.warn("Credentials invalid to cancel job. Attempting to renew credentials and re-try. " +
-                    "Job id - " + gramJob.getIDAsString());
-            renewCredentials(gramJob, context);
-
-            try {
-                gramJob.cancel();
-                gramJob.signal(GramJob.SIGNAL_COMMIT_END);
-            } catch (GramException e1) {
-                throw new GFacException("Error cancelling job, id - " + gramJob.getIDAsString(), e1);
-            } catch (GSSException e1) {
-                throw new GFacException("Error cancelling job, invalid credentials. Job id - "
-                        + gramJob.getIDAsString(), e);
-            }
-        }
-
-    }
-
-    public void initProperties(Map<String, String> properties) throws GFacException {
-
-    }
-
-    private void checkJobStatus(JobExecutionContext jobExecutionContext, GlobusHostType host, String gateKeeper)
-            throws GFacProviderException {
-        int jobStatus = listener.getCurrentStatus();
-
-        if (jobStatus == GramJob.STATUS_FAILED) {
-            
-            String errorMsg = "Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " Job Exit Code = "
-                    + listener.getError() + " Error Description = " + getGramErrorString(listener.getError());
-
-            if (listener.getError() == GRAMProtocolErrorConstants.INVALID_SCRIPT_REPLY) {
-
-                // re-submitting without renewing
-                // TODO verify why we re-submit jobs when we get a invalid script reply
-                if (!reSubmissionInProgress) {
-                    reSubmissionInProgress = true;
-
-                    log.info("Invalid script reply received. Re-submitting job, id - " + job.getIDAsString());
-                    try {
-                        reSubmitJob(gateKeeper, jobExecutionContext, host);
-                    } catch (GFacException e) {
-                        throw new GFacProviderException
-                                ("Error during re-submission. Original job submission data - " + errorMsg,  e);
-                    }
-                    return;
-                }
-
-            } else if (listener.getError() == GRAMProtocolErrorConstants.ERROR_AUTHORIZATION) {
-
-                // re-submit with renewed credentials
-                if (!authorisationFailedAttempt) {
-                    authorisationFailedAttempt = true;
-                    log.info("Authorisation error contacting provider. Re-submitting job with renewed credentials.");
-
-                    try {
-                        renewCredentials(jobExecutionContext);
-                        reSubmitJob(gateKeeper, jobExecutionContext, host);
-                    } catch (GFacException e) {
-                        throw new GFacProviderException
-                                ("Error during re-submission. Original job submission data - " + errorMsg,  e);
-                    }
-
-                    return;
-                }
-
-            } else if (listener.getError() == GRAMProtocolErrorConstants.USER_CANCELLED) {
-
-                log.info("User successfully cancelled job id " + job.getIDAsString());
-                return;
-            }
-
-
-
-            log.error(errorMsg);
-
-            synchronized (this) {
-                currentlyExecutingJobCache.remove(job.getIDAsString());
-            }
-
-            throw new JobSubmissionFault(new Exception(errorMsg), host.getHostAddress(), gateKeeper,
-                            job.getRSL(), jobExecutionContext, getGramErrorString(listener.getError()),
-                    listener.getError());
-
-        } else if (jobStatus == GramJob.STATUS_DONE) {
-            log.info("Job " + job.getIDAsString() + " on host " + host.getHostAddress() + " is successfully executed.");
-
-            synchronized (this) {
-                currentlyExecutingJobCache.remove(job.getIDAsString());
-            }
-        }
-    }
-
-    public String getGramErrorString(int errorCode) {
-
-        if (resources != null) {
-            try {
-                return resources.getProperty(String.valueOf(errorCode));
-            } catch (MissingResourceException mre) {
-                log.warn("Error reading globus error descriptions.", mre);
-                return "Error code: " + errorCode;
-            }
-        } else {
-            return "Error code: " + errorCode;
-        }
-
-    }
-
-}