You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/04 22:15:35 UTC

[22/81] [abbrv] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
new file mode 100644
index 0000000..884ccd5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class LocalInputHandler extends AbstractHandler {
+    private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class);
+    @Override
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+        super.invoke(jobExecutionContext);
+        Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+        for (Map.Entry<String, Object> inputParamEntry : inputParameters.entrySet()) {
+            if (inputParamEntry.getValue() instanceof InputDataObjectType) {
+                InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue();
+                if (inputDataObject.getType() == DataType.URI
+                        && inputDataObject != null
+                        && !inputDataObject.getValue().equals("")) {
+                    try {
+                        inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue()));
+                    } catch (IOException e) {
+                        throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue());
+                    }
+                }
+            }
+        }
+    }
+
+    private String stageFile(String inputDir, String sourceFilePath) throws IOException {
+        int i = sourceFilePath.lastIndexOf(File.separator);
+        String substring = sourceFilePath.substring(i + 1);
+        if (inputDir.endsWith("/")) {
+            inputDir = inputDir.substring(0, inputDir.length() - 1);
+        }
+        String targetFilePath = inputDir + File.separator + substring;
+
+        if (sourceFilePath.startsWith("file")) {
+            sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length());
+        }
+
+        File sourceFile = new File(sourceFilePath);
+        File targetFile = new File(targetFilePath);
+        if (targetFile.exists()) {
+            targetFile.delete();
+        }
+        logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath);
+        FileUtils.copyFile(sourceFile, targetFile);
+
+        return targetFilePath;
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+
+    }
+
+    @Override
+    public void initProperties(Properties properties) throws GFacHandlerException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
new file mode 100644
index 0000000..5519ee0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -0,0 +1,311 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.impl.OutputUtils;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class LocalProvider extends AbstractProvider {
+    private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+    private ProcessBuilder builder;
+    private List<String> cmdList;
+    private String jobId;
+    
+    public static class LocalProviderJobData{
+    	private String applicationName;
+    	private List<String> inputParameters;
+    	private String workingDir;
+    	private String inputDir;
+    	private String outputDir;
+		public String getApplicationName() {
+			return applicationName;
+		}
+		public void setApplicationName(String applicationName) {
+			this.applicationName = applicationName;
+		}
+		public List<String> getInputParameters() {
+			return inputParameters;
+		}
+		public void setInputParameters(List<String> inputParameters) {
+			this.inputParameters = inputParameters;
+		}
+		public String getWorkingDir() {
+			return workingDir;
+		}
+		public void setWorkingDir(String workingDir) {
+			this.workingDir = workingDir;
+		}
+		public String getInputDir() {
+			return inputDir;
+		}
+		public void setInputDir(String inputDir) {
+			this.inputDir = inputDir;
+		}
+		public String getOutputDir() {
+			return outputDir;
+		}
+		public void setOutputDir(String outputDir) {
+			this.outputDir = outputDir;
+		}
+    }
+    public LocalProvider(){
+        cmdList = new ArrayList<String>();
+    }
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+    	super.initialize(jobExecutionContext);
+
+        // build command with all inputs
+        buildCommand();
+        initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
+
+        // extra environment variables
+        builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
+        builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
+
+        // set working directory
+        builder.directory(new File(jobExecutionContext.getWorkingDir()));
+
+        // log info
+        log.info("Command = " + InputUtils.buildCommand(cmdList));
+        log.info("Working dir = " + builder.directory());
+        /*for (String key : builder.environment().keySet()) {
+            log.info("Env[" + key + "] = " + builder.environment().get(key));
+        }*/
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+        JobDetails jobDetails = new JobDetails();
+        try {
+        	jobId = jobExecutionContext.getTaskData().getTaskID();
+            jobDetails.setJobID(jobId);
+            jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
+                    .getApplicationDeploymentDescription().getAppDeploymentDescription());
+            jobExecutionContext.setJobDetails(jobDetails);
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP);
+            // running cmd
+            Process process = builder.start();
+
+            Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput());
+            Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError());
+
+            // start output threads
+            standardOutWriter.setDaemon(true);
+            standardErrorWriter.setDaemon(true);
+            standardOutWriter.start();
+            standardErrorWriter.start();
+
+            int returnValue = process.waitFor();
+
+            // make sure other two threads are done
+            standardOutWriter.join();
+            standardErrorWriter.join();
+
+            /*
+             * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+             * just provide warning in the log messages
+             */
+            if (returnValue != 0) {
+                log.error("Process finished with non zero return value. Process may have failed");
+            } else {
+                log.info("Process finished with return value of zero.");
+            }
+
+            StringBuffer buf = new StringBuffer();
+            buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+                    .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir())
+                    .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ")
+                    .append(String.valueOf(returnValue));
+
+            log.info(buf.toString());
+
+            // updating the job status to complete because there's nothing to monitor in local jobs
+//            MonitorID monitorID = createMonitorID(jobExecutionContext);
+            JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+                    jobExecutionContext.getTaskData().getTaskID(),
+                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                    jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getGatewayID());
+            jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+        } catch (IOException io) {
+            throw new GFacProviderException(io.getMessage(), io);
+        } catch (InterruptedException e) {
+            throw new GFacProviderException(e.getMessage(), e);
+        }catch (GFacException e) {
+            throw new GFacProviderException(e.getMessage(), e);
+        }
+    }
+
+//	private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
+//		MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
+//		        jobExecutionContext.getTaskData().getTaskID(),
+//		        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+//		        jobExecutionContext.getExperiment().getUserName(),jobId);
+//		return monitorID;
+//	}
+
+//	private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+//			throws GFacProviderException {
+//		ApplicationDeploymentDescriptionType app = jobExecutionContext.
+//                getApplicationContext().getApplicationDeploymentDescription().getType();
+//		ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+//		appJob.setJobId(jobId);
+//		LocalProviderJobData data = new LocalProviderJobData();
+//		data.setApplicationName(app.getExecutableLocation());
+//		data.setInputDir(app.getInputDataDirectory());
+//		data.setOutputDir(app.getOutputDataDirectory());
+//		data.setWorkingDir(builder.directory().toString());
+//		data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+//		ByteArrayOutputStream stream = new ByteArrayOutputStream();
+//		JAXB.marshal(data, stream);
+//		appJob.setJobData(stream.toString());
+//		appJob.setSubmittedTime(Calendar.getInstance().getTime());
+//		appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+//		appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+//		GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+//	}
+
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        try {
+        	List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+            String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput());
+            String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError());
+			Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+            TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+            if (taskDetails != null){
+                taskDetails.setApplicationOutputs(outputArray);
+                registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
+            }
+            registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+            TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                    jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getGatewayID());
+            jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+        } catch (XmlException e) {
+            throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+        } catch (IOException io) {
+            throw new GFacProviderException(io.getMessage(), io);
+        } catch (Exception e){
+        	throw new GFacProviderException("Error in retrieving results",e);
+        }
+    }
+
+    public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        // TODO: Auto generated method body.
+    }
+
+    @Override
+    public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        // TODO: Auto generated method body.
+    }
+
+
+    private void buildCommand() {
+        cmdList.add(jobExecutionContext.getExecutablePath());
+        Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+
+        // sort the inputs first and then build the command List
+        Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+            @Override
+            public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+                return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+            }
+        };
+        Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+        for (Object object : inputParameters.values()) {
+            if (object instanceof InputDataObjectType) {
+                InputDataObjectType inputDOT = (InputDataObjectType) object;
+                sortedInputSet.add(inputDOT);
+            }
+        }
+        for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+            if (inputDataObjectType.getApplicationArgument() != null
+                    && !inputDataObjectType.getApplicationArgument().equals("")) {
+                cmdList.add(inputDataObjectType.getApplicationArgument());
+            }
+
+            if (inputDataObjectType.getValue() != null
+                    && !inputDataObjectType.getValue().equals("")) {
+                cmdList.add(inputDataObjectType.getValue());
+            }
+        }
+
+    }
+
+    private void initProcessBuilder(ApplicationDeploymentDescription app){
+        builder = new ProcessBuilder(cmdList);
+
+        List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+        if (setEnvironment != null) {
+            for (SetEnvPaths envPath : setEnvironment) {
+                Map<String,String> builderEnv = builder.environment();
+                builderEnv.put(envPath.getName(), envPath.getValue());
+            }
+        }
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
new file mode 100644
index 0000000..2467ce8
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+public class InputStreamToFileWriter extends Thread{
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private BufferedReader in;
+    private BufferedWriter out;
+
+    public InputStreamToFileWriter(InputStream in, String out) throws IOException {
+        this.in = new BufferedReader(new InputStreamReader(in));
+        this.out = new BufferedWriter(new FileWriter(out));
+    }
+
+    public void run() {
+        try {
+            String line = null;
+            while ((line = in.readLine()) != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug(line);
+                }
+                out.write(line);
+                out.newLine();
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
new file mode 100644
index 0000000..98671fd
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class InputUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(InputUtils.class);
+
+    private static final String SPACE = " ";
+
+    private InputUtils() {
+    }
+
+    public static String buildCommand(List<String> cmdList) {
+        StringBuffer buff = new StringBuffer();
+        for (String string : cmdList) {
+            logger.debug("Build Command --> " + string);
+            buff.append(string);
+            buff.append(SPACE);
+        }
+        return buff.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
new file mode 100644
index 0000000..2b45df7
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class LocalProviderUtil {
+    private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class);
+
+    private void makeFileSystemDir(String dir) throws GFacProviderException {
+        File f = new File(dir);
+        if (f.isDirectory() && f.exists()) {
+            return;
+        } else if (!new File(dir).mkdir()) {
+            throw new GFacProviderException("Cannot make directory " + dir);
+        }
+    }
+
+    public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        log.info("working diectroy = " + jobExecutionContext.getWorkingDir());
+        log.info("temp directory = " + jobExecutionContext.getScratchLocation());
+        makeFileSystemDir(jobExecutionContext.getWorkingDir());
+        makeFileSystemDir(jobExecutionContext.getScratchLocation());
+        makeFileSystemDir(jobExecutionContext.getInputDir());
+        makeFileSystemDir(jobExecutionContext.getOutputDir());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
new file mode 100644
index 0000000..8eba250
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.gfac.monitor;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+public class HPCMonitorID extends MonitorID {
+    private final static Logger logger = LoggerFactory.getLogger(HPCMonitorID.class);
+
+
+    private AuthenticationInfo authenticationInfo = null;
+
+    public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID,
+                        String experimentID, String userName,String jobName) {
+        super(computeResourceDescription, jobID, taskID, workflowNodeID, experimentID, userName,jobName);
+        setComputeResourceDescription(computeResourceDescription);
+        setJobStartedTime(new Timestamp((new Date()).getTime()));
+        setUserName(userName);
+        setJobID(jobID);
+        setTaskID(taskID);
+        setExperimentID(experimentID);
+        setWorkflowNodeID(workflowNodeID);
+    }
+
+    public HPCMonitorID(AuthenticationInfo authenticationInfo, JobExecutionContext jobExecutionContext) {
+        super(jobExecutionContext);
+        this.authenticationInfo = authenticationInfo;
+        if (this.authenticationInfo != null) {
+            try {
+                String hostAddress = jobExecutionContext.getHostName();
+                SecurityContext securityContext = jobExecutionContext.getSecurityContext(hostAddress);
+                ServerInfo serverInfo = null;
+                if (securityContext != null) {
+                    if (securityContext instanceof  GSISecurityContext){
+                        serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                        if (serverInfo.getUserName() != null) {
+                            setUserName(serverInfo.getUserName());
+                        }
+                    }
+                    if (securityContext instanceof SSHSecurityContext){
+                        serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+                        if (serverInfo.getUserName() != null) {
+                            setUserName(serverInfo.getUserName());
+                        }
+                    }
+                }
+            } catch (GFacException e) {
+                logger.error("Error while getting security context", e);
+            }
+        }
+    }
+
+    public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName, AuthenticationInfo authenticationInfo) {
+        setComputeResourceDescription(computeResourceDescription);
+        setJobStartedTime(new Timestamp((new Date()).getTime()));
+        this.authenticationInfo = authenticationInfo;
+        // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
+        if (this.authenticationInfo != null) {
+            if (this.authenticationInfo instanceof MyProxyAuthenticationInfo) {
+                setUserName(((MyProxyAuthenticationInfo) this.authenticationInfo).getUserName());
+            }
+        }
+        setJobID(jobID);
+        setTaskID(taskID);
+        setExperimentID(experimentID);
+        setWorkflowNodeID(workflowNodeID);
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
new file mode 100644
index 0000000..f29e3e6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+//    private HostDescription host;
+    private ComputeResourceDescription computeResourceDescription;
+    private JobSubmissionProtocol jobSubmissionProtocol;
+    private DataMovementProtocol dataMovementProtocol;
+
+    private List<MonitorID> monitorIDs;
+
+    public HostMonitorData(JobExecutionContext jobExecutionContext) {
+        this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
+        this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+        this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol();
+        this.monitorIDs = new ArrayList<MonitorID>();
+    }
+
+    public HostMonitorData(JobExecutionContext jobExecutionContext, List<MonitorID> monitorIDs) {
+        this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
+        this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+        this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol();
+        this.monitorIDs = monitorIDs;
+    }
+
+    public ComputeResourceDescription getComputeResourceDescription() {
+        return computeResourceDescription;
+    }
+
+    public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+        this.computeResourceDescription = computeResourceDescription;
+    }
+
+    public List<MonitorID> getMonitorIDs() {
+        return monitorIDs;
+    }
+
+    public void setMonitorIDs(List<MonitorID> monitorIDs) {
+        this.monitorIDs = monitorIDs;
+    }
+
+    /**
+     * this method get called by CommonUtils and it will check the right place before adding
+     * so there will not be a mismatch between this.host and monitorID.host
+     * @param monitorID
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
+        monitorIDs.add(monitorID);
+    }
+
+    public JobSubmissionProtocol getJobSubmissionProtocol() {
+        return jobSubmissionProtocol;
+    }
+
+    public DataMovementProtocol getDataMovementProtocol() {
+        return dataMovementProtocol;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+    private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+    private String  userName;
+
+    private List<HostMonitorData> hostMonitorData;
+
+
+    public UserMonitorData(String userName) {
+        this.userName = userName;
+        hostMonitorData = new ArrayList<HostMonitorData>();
+    }
+
+    public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+        this.hostMonitorData = hostMonitorDataList;
+        this.userName = userName;
+    }
+
+    public List<HostMonitorData> getHostMonitorData() {
+        return hostMonitorData;
+    }
+
+    public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+        this.hostMonitorData = hostMonitorData;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /*
+    This method will add element to the MonitorID list, user should not
+    duplicate it, we do not check it because its going to be used by airavata
+    so we have to use carefully and this method will add a host if its a new host
+     */
+    public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+        this.hostMonitorData.add(hostMonitorData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+	private String experimentId;
+
+	public ExperimentCancelRequest(String experimentId) {
+		this.experimentId = experimentId;
+	}
+
+	public String getExperimentId() {
+		return experimentId;
+	}
+
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+	private String experimentId;
+	private String nodeId;
+	private String taskId;
+	
+	public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+		this.experimentId = experimentId;
+		this.setNodeId(nodeId);
+		this.taskId = taskId;
+	}
+	public String getExperimentId() {
+		return experimentId;
+	}
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+	public String getNodeId() {
+		return nodeId;
+	}
+	public void setNodeId(String nodeId) {
+		this.nodeId = nodeId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..b4ac3a9
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect nto consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+    /**
+     * This method is to implement how to parse the incoming message
+     * and implement a logic to finalize the status of the job,
+     * we have to makesure the correct message is given to the messageparser
+     * parse method, it will not do any filtering
+     * @param message content of the message
+     * @return
+     */
+    JobState parseMessage(String message)throws AiravataMonitorException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..614d606
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor {
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
new file mode 100644
index 0000000..efdf89c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PullMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This will allow users to program Pull monitors separately
+ */
+public abstract class PullMonitor extends AiravataAbstractMonitor {
+
+    private int pollingFrequence;
+    /**
+     * This method will can invoke when PullMonitor needs to start
+     * and it has to invoke in the frequency specified below,
+     * @return if the start process is successful return true else false
+     */
+    public abstract boolean startPulling() throws AiravataMonitorException;
+
+    /**
+     * This is the method to stop the polling process
+     * @return if the stopping process is successful return true else false
+     */
+    public abstract boolean stopPulling()throws AiravataMonitorException;
+
+    /**
+     * this method can be used to set the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @param frequence
+     */
+    public void setPollingFrequence(int frequence){
+        this.pollingFrequence = frequence;
+    }
+
+    /**
+     * this method can be used to get the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @return
+     */
+    public int getPollingFrequence(){
+        return this.pollingFrequence;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
new file mode 100644
index 0000000..1b6a228
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PushMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This interface will allow users to program Push monitors separately
+ */
+public abstract class PushMonitor extends AiravataAbstractMonitor {
+    /**
+     * This method can be invoked to register a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This method can be invoked to unregister a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This can be used to stop the registration thread
+     * @return
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public abstract boolean stopRegister()throws AiravataMonitorException;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
new file mode 100644
index 0000000..eea6ef6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.impl.OutHandlerWorker;
+import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+
+import javax.mail.Address;
+import javax.mail.Flags;
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Store;
+import javax.mail.search.FlagTerm;
+import javax.mail.search.SearchTerm;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EmailBasedMonitor implements Runnable{
+    private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
+
+    public static final int COMPARISON = 6; // after and equal
+    public static final String IMAPS = "imaps";
+    public static final String POP3 = "pop3";
+    private boolean stopMonitoring = false;
+
+    private Session session ;
+    private Store store;
+    private Folder emailFolder;
+    private Properties properties;
+    private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>();
+    private String host, emailAddress, password, storeProtocol, folderName ;
+    private Date monitorStartDate;
+    private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
+
+    public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
+        init();
+    }
+
+    private void init() throws AiravataException {
+        host = ServerSettings.getEmailBasedMonitorHost();
+        emailAddress = ServerSettings.getEmailBasedMonitorAddress();
+        password = ServerSettings.getEmailBasedMonitorPassword();
+        storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
+        folderName = ServerSettings.getEmailBasedMonitorFolderName();
+        if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) {
+            throw new AiravataException("Unsupported store protocol , expected " +
+                    IMAPS + " or " + POP3 + " but found " + storeProtocol);
+        }
+        properties = new Properties();
+        properties.put("mail.store.protocol", storeProtocol);
+    }
+
+    public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
+        String monitorId = jobExecutionContext.getJobDetails().getJobID();
+        if (monitorId == null || monitorId.isEmpty()) {
+            monitorId = jobExecutionContext.getJobDetails().getJobName();
+        }
+        addToJobMonitorMap(monitorId, jobExecutionContext);
+    }
+
+    public void addToJobMonitorMap(String monitorId, JobExecutionContext jobExecutionContext) {
+        log.info("[EJM]: Added monitor Id : " + monitorId + " to email based monitor map");
+        jobMonitorMap.put(monitorId, jobExecutionContext);
+    }
+
+    private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
+        Address fromAddress = message.getFrom()[0];
+        String addressStr = fromAddress.toString();
+        ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
+        EmailParser emailParser = emailParserMap.get(jobMonitorType);
+        if (emailParser == null) {
+            switch (jobMonitorType) {
+                case PBS:
+                    emailParser = new PBSEmailParser();
+                    break;
+                case SLURM:
+                    emailParser = new SLURMEmailParser();
+                    break;
+                case LSF:
+                    emailParser = new LSFEmailParser();
+                    break;
+                case UGE:
+                    emailParser = new UGEEmailParser();
+                    break;
+                default:
+                    throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType.toString() + " for email monitoring -->  " + addressStr);
+            }
+
+            emailParserMap.put(jobMonitorType, emailParser);
+        }
+        return emailParser.parseEmail(message);
+    }
+
+    private ResourceJobManagerType getJobMonitorType(String addressStr) throws AiravataException {
+        System.out.println("*********** address ******** : " + addressStr);
+        switch (addressStr) {
+            case "pbsconsult@sdsc.edu":   // trestles , gordan
+            case "adm@trident.bigred2.uits.iu.edu":  // bigred2
+            case "root <ad...@trident.bigred2.uits.iu.edu>": // bigred2
+            case "root <ad...@scyld.localdomain>": // alamo
+                return ResourceJobManagerType.PBS;
+            case "SDSC Admin <sl...@comet-fe3.sdsc.edu>": // comet
+            case "slurm@batch1.stampede.tacc.utexas.edu": // stampede
+            case "slurm user <sl...@tempest.dsc.soic.indiana.edu>":
+                return ResourceJobManagerType.SLURM;
+//            case "lsf":
+//                return ResourceJobManagerType.LSF;
+            default:
+                if (addressStr.contains("ls4.tacc.utexas.edu>")) { // lonestar
+                    return ResourceJobManagerType.UGE;
+                } else {
+                    throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr);
+                }
+        }
+
+    }
+
+    @Override
+    public void run() {
+        try {
+            session = Session.getDefaultInstance(properties);
+            store = session.getStore(storeProtocol);
+            store.connect(host, emailAddress, password);
+            emailFolder = store.getFolder(folderName);
+            // first time we search for all unread messages.
+            SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
+            while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
+                Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get a rest till job finishes
+                if (jobMonitorMap.isEmpty()) {
+                    log.info("[EJM]: Job Monitor Map is empty, no need to retrieve emails");
+                    continue;
+                } else {
+                    log.info("[EJM]: " + jobMonitorMap.size() + " job/s in job monitor map");
+                }
+                if (!store.isConnected()) {
+                    store.connect();
+                    emailFolder = store.getFolder(folderName);
+                }
+                log.info("[EJM]: Retrieving unseen emails");
+                emailFolder.open(Folder.READ_WRITE);
+                Message[] searchMessages = emailFolder.search(unseenBefore);
+                if (searchMessages == null || searchMessages.length == 0) {
+                    log.info("[EJM]: No new email messages");
+                } else {
+                    log.info("[EJM]: "+searchMessages.length + " new email/s received");
+                }
+                processMessages(searchMessages);
+                emailFolder.close(false);
+            }
+        } catch (MessagingException e) {
+            log.error("[EJM]: Couldn't connect to the store ", e);
+        } catch (InterruptedException e) {
+            log.error("[EJM]: Interrupt exception while sleep ", e);
+        } catch (AiravataException e) {
+            log.error("[EJM]: UnHandled arguments ", e);
+        } finally {
+            try {
+                emailFolder.close(false);
+                store.close();
+            } catch (MessagingException e) {
+                log.error("[EJM]: Store close operation failed, couldn't close store", e);
+            }
+        }
+    }
+
+    private void processMessages(Message[] searchMessages) throws MessagingException {
+        List<Message> processedMessages = new ArrayList<>();
+        List<Message> unreadMessages = new ArrayList<>();
+        for (Message message : searchMessages) {
+            try {
+                JobStatusResult jobStatusResult = parse(message);
+                JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
+                if (jEC == null) {
+                    jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+                }
+                if (jEC != null) {
+                    process(jobStatusResult, jEC);
+                    processedMessages.add(message);
+                } else {
+                    // we can get JobExecutionContext null in multiple Gfac instances environment,
+                    // where this job is not submitted by this Gfac instance hence we ignore this message.
+                    unreadMessages.add(message);
+//                  log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
+                }
+            } catch (AiravataException e) {
+                log.error("[EJM]: Error parsing email message =====================================>", e);
+                try {
+                    writeEnvelopeOnError(message);
+                } catch (MessagingException e1) {
+                    log.error("[EJM]: Error printing envelop of the email");
+                }
+                unreadMessages.add(message);
+            } catch (MessagingException e) {
+                log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
+                unreadMessages.add(message);
+            }
+        }
+        if (!processedMessages.isEmpty()) {
+            Message[] seenMessages = new Message[processedMessages.size()];
+            processedMessages.toArray(seenMessages);
+            try {
+                emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+            } catch (MessagingException e) {
+                if (!store.isConnected()) {
+                    store.connect();
+                    emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+                }
+            }
+
+        }
+        if (!unreadMessages.isEmpty()) {
+            Message[] unseenMessages = new Message[unreadMessages.size()];
+            unreadMessages.toArray(unseenMessages);
+            try {
+                emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+            } catch (MessagingException e) {
+                if (!store.isConnected()) {
+                    store.connect();
+                    emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+
+                }
+            }
+        }
+    }
+
+    private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
+        JobState resultState = jobStatusResult.getState();
+        jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
+        boolean runOutHandlers = false;
+        String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
+        // TODO - Handle all other valid JobStates
+        if (resultState == JobState.COMPLETE) {
+            jobMonitorMap.remove(jobStatusResult.getJobId());
+            runOutHandlers = true;
+            log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails);
+        }else if (resultState == JobState.QUEUED) {
+            // nothing special thing to do, update the status change to rabbit mq at the end of this method.
+            log.info("[EJM]: Job Queued email received, " + jobDetails);
+        }else if (resultState == JobState.ACTIVE) {
+            // nothing special thing to do, update the status change to rabbit mq at the end of this method.
+            log.info("[EJM]: Job Active email received, " + jobDetails);
+        }else if (resultState == JobState.FAILED) {
+            jobMonitorMap.remove(jobStatusResult.getJobId());
+            runOutHandlers = true;
+            log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails);
+        }else if (resultState == JobState.CANCELED) {
+            jobMonitorMap.remove(jobStatusResult.getJobId());
+            runOutHandlers = false; // Do we need to run out handlers in canceled case?
+            log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails);
+
+        }
+        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
+        publishJobStatusChange(jEC);
+
+        if (runOutHandlers) {
+            log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
+            GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC));
+        }
+    }
+
+    private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
+        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+        JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+                jobExecutionContext.getTaskData().getTaskID(),
+                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getGatewayID());
+        jobStatus.setJobIdentity(jobIdentity);
+        jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
+        // we have this JobStatus class to handle amqp monitoring
+        log.debugId(jobStatus.getJobIdentity().getJobId(), "[EJM]: Published job status(" +
+                        jobExecutionContext.getJobDetails().getJobStatus().getJobState().toString() + ") change request, " +
+                        "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+                jobStatus.getJobIdentity().getTaskId());
+
+        jobExecutionContext.getMonitorPublisher().publish(jobStatus);
+    }
+
+    private void writeEnvelopeOnError(Message m) throws MessagingException {
+        Address[] a;
+        // FROM
+        if ((a = m.getFrom()) != null) {
+            for (int j = 0; j < a.length; j++)
+                log.error("FROM: " + a[j].toString());
+        }
+        // TO
+        if ((a = m.getRecipients(Message.RecipientType.TO)) != null) {
+            for (int j = 0; j < a.length; j++)
+                log.error("TO: " + a[j].toString());
+        }
+        // SUBJECT
+        if (m.getSubject() != null)
+            log.error("SUBJECT: " + m.getSubject());
+    }
+
+    public void stopMonitoring() {
+        stopMonitoring = true;
+    }
+
+    public void setDate(Date date) {
+        this.monitorStartDate = date;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
new file mode 100644
index 0000000..3a75331
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailMonitorFactory {
+
+    private static EmailBasedMonitor emailBasedMonitor;
+    private static Date startMonitorDate = Calendar.getInstance().getTime();
+
+    public static EmailBasedMonitor getEmailBasedMonitor(ResourceJobManagerType resourceJobManagerType) throws AiravataException {
+        if (emailBasedMonitor == null) {
+            synchronized (EmailMonitorFactory.class){
+                if (emailBasedMonitor == null) {
+                    emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType);
+                    emailBasedMonitor.setDate(startMonitorDate);
+                    new Thread(emailBasedMonitor).start();
+                }
+            }
+        }
+        return emailBasedMonitor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
new file mode 100644
index 0000000..1b5a027
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LSFEmailParser implements EmailParser {
+    private static final Logger log = LoggerFactory.getLogger(LSFEmailParser.class);
+    //root@c312-206.ls4.tacc.utexas.edu
+    private static final String SIGNAL = "signal";
+    private static final String LONESTAR_REGEX = "Job (?<" + JOBID + ">\\d+) \\(.*\\) (?<" + STATUS
+            + ">.*)\\s[a-zA-Z =]+(?<" + EXIT_STATUS + ">\\d+)\\sSignal[ ]*=[ ]*(?<" + SIGNAL + ">[a-zA-z]*)";
+
+    @Override
+    public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
+        JobStatusResult jobStatusResult = new JobStatusResult();
+        try {
+            String content = ((String) message.getContent());
+            Pattern pattern = Pattern.compile(LONESTAR_REGEX);
+            Matcher matcher = pattern.matcher(content);
+            if (matcher.find()) {
+                jobStatusResult.setJobId(matcher.group(JOBID));
+                String status = matcher.group(STATUS);
+                jobStatusResult.setState(getJobState(status, content));
+                return jobStatusResult;
+            } else {
+                log.error("[EJM]: No matched found for content => \n" + content);
+            }
+        } catch (IOException e) {
+            throw new AiravataException("i[EJM]: Error while reading content of the email message");
+        }
+        return jobStatusResult;
+    }
+
+    private JobState getJobState(String status, String content) {
+        switch (status) {
+            case "Aborted":
+                return JobState.FAILED;
+            case "Success":
+                return JobState.COMPLETE;
+            default:
+                return JobState.UNKNOWN;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
new file mode 100644
index 0000000..4a3c88b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PBSEmailParser implements EmailParser {
+
+    private static final Logger log = LoggerFactory.getLogger(PBSEmailParser.class);
+
+
+    private static final String REGEX = "[a-zA-Z ]*:[ ]*(?<" +  JOBID + ">[a-zA-Z0-9-\\.]*)\\s+[a-zA-Z ]*:[ ]*(?<"+
+            JOBNAME + ">[a-zA-Z0-9-\\.]*)\\s+.*\\s+(?<" + STATUS + ">[a-zA-Z\\ ]*)";
+    private static final String REGEX_EXIT_STATUS = "Exit_status=(?<" + EXIT_STATUS + ">[\\d]+)";
+    public static final String BEGUN_EXECUTION = "Begun execution";
+    public static final String EXECUTION_TERMINATED = "Execution terminated";
+    public static final String ABORTED_BY_PBS_SERVER = "Aborted by PBS Server";
+
+    @Override
+    public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
+        JobStatusResult jobStatusResult = new JobStatusResult();
+//        log.info("Parsing -> " + message.getSubject());
+        try {
+            String content = ((String) message.getContent());
+            Pattern pattern = Pattern.compile(REGEX);
+            Matcher matcher = pattern.matcher(content);
+            if (matcher.find()) {
+                jobStatusResult.setJobId(matcher.group(JOBID));
+                jobStatusResult.setJobName(matcher.group(JOBNAME));
+                String statusLine = matcher.group(STATUS);
+                jobStatusResult.setState(getJobState(statusLine, content));
+                return jobStatusResult;
+            } else {
+                log.error("[EJM]: No matched found for content => \n" + content);
+            }
+
+        } catch (IOException e) {
+            throw new AiravataException("[EJM]: Error while reading content of the email message");
+        }
+        return jobStatusResult;
+    }
+
+    private JobState getJobState(String statusLine, String content) {
+        switch (statusLine) {
+            case BEGUN_EXECUTION:
+                return JobState.ACTIVE;
+            case EXECUTION_TERMINATED:
+                int exitStatus = getExitStatus(content);
+                if (exitStatus == 0) {
+                    // TODO - Remove rabbitmq client script line from the script.
+                    return JobState.COMPLETE;
+                } else if (exitStatus == 271) {
+                    return JobState.CANCELED;
+                } else {
+                    return JobState.FAILED;
+                }
+            case ABORTED_BY_PBS_SERVER:
+                return JobState.FAILED;
+            default:
+                return JobState.UNKNOWN;
+        }
+    }
+
+    private int getExitStatus(String content) {
+        Pattern pattern = Pattern.compile(REGEX_EXIT_STATUS);
+        Matcher matcher = pattern.matcher(content);
+        if (matcher.find()) {
+            String group = matcher.group(EXIT_STATUS);
+            if (group != null && !group.trim().isEmpty()) {
+                return Integer.valueOf(group.trim());
+            }
+        }
+        return -1;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
new file mode 100644
index 0000000..9dd32c0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SLURMEmailParser implements EmailParser {
+
+    private static final Logger log = LoggerFactory.getLogger(SLURMEmailParser.class);
+
+    private static final String REGEX = "[A-Z]*\\s[a-zA-Z]*_[a-z]*=(?<" + JOBID + ">\\d*)[ ]*[a-zA-Z]*=(?<"+
+            JOBNAME + ">[a-zA-Z0-9-]*)[ ]*(?<" + STATUS + ">[]a-zA-Z]*),.*";
+
+    public static final String BEGAN = "Began";
+    public static final String ENDED = "Ended";
+    public static final String FAILED = "Failed";
+    private static final Pattern cancelledStatePattern = Pattern.compile("CANCELLED");
+    private static final Pattern pattern = Pattern.compile(REGEX);
+
+    @Override
+    public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException{
+        JobStatusResult jobStatusResult = new JobStatusResult();
+        String subject = message.getSubject();
+        Matcher matcher = pattern.matcher(subject);
+        if (matcher.find()) {
+            jobStatusResult.setJobId(matcher.group(JOBID));
+            jobStatusResult.setJobName(matcher.group(JOBNAME));
+            jobStatusResult.setState(getJobState(matcher.group(STATUS), subject));
+            return jobStatusResult;
+        } else {
+            log.error("[EJM]: No matched found for subject -> " + subject);
+        }
+        return jobStatusResult;
+    }
+
+    private JobState getJobState(String state, String subject) {
+        switch (state.trim()) {
+            case BEGAN:
+                return JobState.ACTIVE;
+            case ENDED:
+                Matcher matcher = cancelledStatePattern.matcher(subject);
+                if (matcher.find()) {
+                   return JobState.CANCELED;
+                }
+                return JobState.COMPLETE;
+            case FAILED:
+                return JobState.FAILED;
+            default:
+                log.error("[EJM]: Job State " + state + " isn't handle by SLURM parser");
+                return JobState.UNKNOWN;
+
+        }
+    }
+
+}