You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:39 UTC
[22/39] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
new file mode 100644
index 0000000..884ccd5
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class LocalInputHandler extends AbstractHandler {
+ private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class);
+ @Override
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ super.invoke(jobExecutionContext);
+ Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+ for (Map.Entry<String, Object> inputParamEntry : inputParameters.entrySet()) {
+ if (inputParamEntry.getValue() instanceof InputDataObjectType) {
+ InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue();
+ if (inputDataObject.getType() == DataType.URI
+ && inputDataObject != null
+ && !inputDataObject.getValue().equals("")) {
+ try {
+ inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue()));
+ } catch (IOException e) {
+ throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ private String stageFile(String inputDir, String sourceFilePath) throws IOException {
+ int i = sourceFilePath.lastIndexOf(File.separator);
+ String substring = sourceFilePath.substring(i + 1);
+ if (inputDir.endsWith("/")) {
+ inputDir = inputDir.substring(0, inputDir.length() - 1);
+ }
+ String targetFilePath = inputDir + File.separator + substring;
+
+ if (sourceFilePath.startsWith("file")) {
+ sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length());
+ }
+
+ File sourceFile = new File(sourceFilePath);
+ File targetFile = new File(targetFilePath);
+ if (targetFile.exists()) {
+ targetFile.delete();
+ }
+ logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath);
+ FileUtils.copyFile(sourceFile, targetFile);
+
+ return targetFilePath;
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+
+ }
+
+ @Override
+ public void initProperties(Properties properties) throws GFacHandlerException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
new file mode 100644
index 0000000..5519ee0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -0,0 +1,311 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.provider.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.impl.OutputUtils;
+import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
+import org.apache.airavata.gfac.local.utils.InputUtils;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class LocalProvider extends AbstractProvider {
+ private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
+ private ProcessBuilder builder;
+ private List<String> cmdList;
+ private String jobId;
+
+ public static class LocalProviderJobData{
+ private String applicationName;
+ private List<String> inputParameters;
+ private String workingDir;
+ private String inputDir;
+ private String outputDir;
+ public String getApplicationName() {
+ return applicationName;
+ }
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+ public List<String> getInputParameters() {
+ return inputParameters;
+ }
+ public void setInputParameters(List<String> inputParameters) {
+ this.inputParameters = inputParameters;
+ }
+ public String getWorkingDir() {
+ return workingDir;
+ }
+ public void setWorkingDir(String workingDir) {
+ this.workingDir = workingDir;
+ }
+ public String getInputDir() {
+ return inputDir;
+ }
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
+ public String getOutputDir() {
+ return outputDir;
+ }
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+ }
+ public LocalProvider(){
+ cmdList = new ArrayList<String>();
+ }
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
+ super.initialize(jobExecutionContext);
+
+ // build command with all inputs
+ buildCommand();
+ initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
+
+ // extra environment variables
+ builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
+ builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
+
+ // set working directory
+ builder.directory(new File(jobExecutionContext.getWorkingDir()));
+
+ // log info
+ log.info("Command = " + InputUtils.buildCommand(cmdList));
+ log.info("Working dir = " + builder.directory());
+ /*for (String key : builder.environment().keySet()) {
+ log.info("Env[" + key + "] = " + builder.environment().get(key));
+ }*/
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
+ JobDetails jobDetails = new JobDetails();
+ try {
+ jobId = jobExecutionContext.getTaskData().getTaskID();
+ jobDetails.setJobID(jobId);
+ jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getAppDeploymentDescription());
+ jobExecutionContext.setJobDetails(jobDetails);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP);
+ // running cmd
+ Process process = builder.start();
+
+ Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput());
+ Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError());
+
+ // start output threads
+ standardOutWriter.setDaemon(true);
+ standardErrorWriter.setDaemon(true);
+ standardOutWriter.start();
+ standardErrorWriter.start();
+
+ int returnValue = process.waitFor();
+
+ // make sure other two threads are done
+ standardOutWriter.join();
+ standardErrorWriter.join();
+
+ /*
+ * check return value. usually not very helpful to draw conclusions based on return values so don't bother.
+ * just provide warning in the log messages
+ */
+ if (returnValue != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ StringBuffer buf = new StringBuffer();
+ buf.append("Executed ").append(InputUtils.buildCommand(cmdList))
+ .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir())
+ .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ")
+ .append(String.valueOf(returnValue));
+
+ log.info(buf.toString());
+
+ // updating the job status to complete because there's nothing to monitor in local jobs
+// MonitorID monitorID = createMonitorID(jobExecutionContext);
+ JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+ jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (InterruptedException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }catch (GFacException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ }
+ }
+
+// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) {
+// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId,
+// jobExecutionContext.getTaskData().getTaskID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getExperiment().getUserName(),jobId);
+// return monitorID;
+// }
+
+// private void saveApplicationJob(JobExecutionContext jobExecutionContext)
+// throws GFacProviderException {
+// ApplicationDeploymentDescriptionType app = jobExecutionContext.
+// getApplicationContext().getApplicationDeploymentDescription().getType();
+// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext);
+// appJob.setJobId(jobId);
+// LocalProviderJobData data = new LocalProviderJobData();
+// data.setApplicationName(app.getExecutableLocation());
+// data.setInputDir(app.getInputDataDirectory());
+// data.setOutputDir(app.getOutputDataDirectory());
+// data.setWorkingDir(builder.directory().toString());
+// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext));
+// ByteArrayOutputStream stream = new ByteArrayOutputStream();
+// JAXB.marshal(data, stream);
+// appJob.setJobData(stream.toString());
+// appJob.setSubmittedTime(Calendar.getInstance().getTime());
+// appJob.setStatus(ApplicationJobStatus.SUBMITTED);
+// appJob.setStatusUpdateTime(appJob.getSubmittedTime());
+// GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
+// }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ try {
+ List<OutputDataObjectType> outputArray = new ArrayList<OutputDataObjectType>();
+ String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput());
+ String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError());
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray);
+ TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+ if (taskDetails != null){
+ taskDetails.setApplicationOutputs(outputArray);
+ registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID());
+ }
+ registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+ } catch (XmlException e) {
+ throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
+ } catch (IOException io) {
+ throw new GFacProviderException(io.getMessage(), io);
+ } catch (Exception e){
+ throw new GFacProviderException("Error in retrieving results",e);
+ }
+ }
+
+ public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
+ @Override
+ public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
+
+ private void buildCommand() {
+ cmdList.add(jobExecutionContext.getExecutablePath());
+ Map<String, Object> inputParameters = jobExecutionContext.getInMessageContext().getParameters();
+
+ // sort the inputs first and then build the command List
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (Object object : inputParameters.values()) {
+ if (object instanceof InputDataObjectType) {
+ InputDataObjectType inputDOT = (InputDataObjectType) object;
+ sortedInputSet.add(inputDOT);
+ }
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ cmdList.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ cmdList.add(inputDataObjectType.getValue());
+ }
+ }
+
+ }
+
+ private void initProcessBuilder(ApplicationDeploymentDescription app){
+ builder = new ProcessBuilder(cmdList);
+
+ List<SetEnvPaths> setEnvironment = app.getSetEnvironment();
+ if (setEnvironment != null) {
+ for (SetEnvPaths envPath : setEnvironment) {
+ Map<String,String> builderEnv = builder.environment();
+ builderEnv.put(envPath.getName(), envPath.getValue());
+ }
+ }
+ }
+
+ public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
new file mode 100644
index 0000000..2467ce8
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputStreamToFileWriter.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+public class InputStreamToFileWriter extends Thread{
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private BufferedReader in;
+ private BufferedWriter out;
+
+ public InputStreamToFileWriter(InputStream in, String out) throws IOException {
+ this.in = new BufferedReader(new InputStreamReader(in));
+ this.out = new BufferedWriter(new FileWriter(out));
+ }
+
+ public void run() {
+ try {
+ String line = null;
+ while ((line = in.readLine()) != null) {
+ if (log.isDebugEnabled()) {
+ log.debug(line);
+ }
+ out.write(line);
+ out.newLine();
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
new file mode 100644
index 0000000..98671fd
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/InputUtils.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class InputUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(InputUtils.class);
+
+ private static final String SPACE = " ";
+
+ private InputUtils() {
+ }
+
+ public static String buildCommand(List<String> cmdList) {
+ StringBuffer buff = new StringBuffer();
+ for (String string : cmdList) {
+ logger.debug("Build Command --> " + string);
+ buff.append(string);
+ buff.append(SPACE);
+ }
+ return buff.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
new file mode 100644
index 0000000..2b45df7
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.local.utils;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class LocalProviderUtil {
+ private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class);
+
+ private void makeFileSystemDir(String dir) throws GFacProviderException {
+ File f = new File(dir);
+ if (f.isDirectory() && f.exists()) {
+ return;
+ } else if (!new File(dir).mkdir()) {
+ throw new GFacProviderException("Cannot make directory " + dir);
+ }
+ }
+
+ public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ log.info("working diectroy = " + jobExecutionContext.getWorkingDir());
+ log.info("temp directory = " + jobExecutionContext.getScratchLocation());
+ makeFileSystemDir(jobExecutionContext.getWorkingDir());
+ makeFileSystemDir(jobExecutionContext.getScratchLocation());
+ makeFileSystemDir(jobExecutionContext.getInputDir());
+ makeFileSystemDir(jobExecutionContext.getOutputDir());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
new file mode 100644
index 0000000..8eba250
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java
@@ -0,0 +1,107 @@
+package org.apache.airavata.gfac.monitor;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gfac.ssh.api.ServerInfo;
+import org.apache.airavata.gfac.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+public class HPCMonitorID extends MonitorID {
+ private final static Logger logger = LoggerFactory.getLogger(HPCMonitorID.class);
+
+
+ private AuthenticationInfo authenticationInfo = null;
+
+ public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID,
+ String experimentID, String userName,String jobName) {
+ super(computeResourceDescription, jobID, taskID, workflowNodeID, experimentID, userName,jobName);
+ setComputeResourceDescription(computeResourceDescription);
+ setJobStartedTime(new Timestamp((new Date()).getTime()));
+ setUserName(userName);
+ setJobID(jobID);
+ setTaskID(taskID);
+ setExperimentID(experimentID);
+ setWorkflowNodeID(workflowNodeID);
+ }
+
+ public HPCMonitorID(AuthenticationInfo authenticationInfo, JobExecutionContext jobExecutionContext) {
+ super(jobExecutionContext);
+ this.authenticationInfo = authenticationInfo;
+ if (this.authenticationInfo != null) {
+ try {
+ String hostAddress = jobExecutionContext.getHostName();
+ SecurityContext securityContext = jobExecutionContext.getSecurityContext(hostAddress);
+ ServerInfo serverInfo = null;
+ if (securityContext != null) {
+ if (securityContext instanceof GSISecurityContext){
+ serverInfo = (((GSISecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ if (serverInfo.getUserName() != null) {
+ setUserName(serverInfo.getUserName());
+ }
+ }
+ if (securityContext instanceof SSHSecurityContext){
+ serverInfo = (((SSHSecurityContext) securityContext).getPbsCluster()).getServerInfo();
+ if (serverInfo.getUserName() != null) {
+ setUserName(serverInfo.getUserName());
+ }
+ }
+ }
+ } catch (GFacException e) {
+ logger.error("Error while getting security context", e);
+ }
+ }
+ }
+
+ public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName, AuthenticationInfo authenticationInfo) {
+ setComputeResourceDescription(computeResourceDescription);
+ setJobStartedTime(new Timestamp((new Date()).getTime()));
+ this.authenticationInfo = authenticationInfo;
+ // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
+ if (this.authenticationInfo != null) {
+ if (this.authenticationInfo instanceof MyProxyAuthenticationInfo) {
+ setUserName(((MyProxyAuthenticationInfo) this.authenticationInfo).getUserName());
+ }
+ }
+ setJobID(jobID);
+ setTaskID(taskID);
+ setExperimentID(experimentID);
+ setWorkflowNodeID(workflowNodeID);
+ }
+
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ this.authenticationInfo = authenticationInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
new file mode 100644
index 0000000..f29e3e6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+// private HostDescription host;
+ private ComputeResourceDescription computeResourceDescription;
+ private JobSubmissionProtocol jobSubmissionProtocol;
+ private DataMovementProtocol dataMovementProtocol;
+
+ private List<MonitorID> monitorIDs;
+
+ public HostMonitorData(JobExecutionContext jobExecutionContext) {
+ this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
+ this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+ this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol();
+ this.monitorIDs = new ArrayList<MonitorID>();
+ }
+
+ public HostMonitorData(JobExecutionContext jobExecutionContext, List<MonitorID> monitorIDs) {
+ this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription();
+ this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
+ this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol();
+ this.monitorIDs = monitorIDs;
+ }
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
+ }
+
+ public List<MonitorID> getMonitorIDs() {
+ return monitorIDs;
+ }
+
+ public void setMonitorIDs(List<MonitorID> monitorIDs) {
+ this.monitorIDs = monitorIDs;
+ }
+
+ /**
+ * this method get called by CommonUtils and it will check the right place before adding
+ * so there will not be a mismatch between this.host and monitorID.host
+ * @param monitorID
+ * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+ */
+ public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
+ monitorIDs.add(monitorID);
+ }
+
+ public JobSubmissionProtocol getJobSubmissionProtocol() {
+ return jobSubmissionProtocol;
+ }
+
+ public DataMovementProtocol getDataMovementProtocol() {
+ return dataMovementProtocol;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+ private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+ private String userName;
+
+ private List<HostMonitorData> hostMonitorData;
+
+
+ public UserMonitorData(String userName) {
+ this.userName = userName;
+ hostMonitorData = new ArrayList<HostMonitorData>();
+ }
+
+ public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+ this.hostMonitorData = hostMonitorDataList;
+ this.userName = userName;
+ }
+
+ public List<HostMonitorData> getHostMonitorData() {
+ return hostMonitorData;
+ }
+
+ public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+ this.hostMonitorData = hostMonitorData;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /*
+ This method will add element to the MonitorID list, user should not
+ duplicate it, we do not check it because its going to be used by airavata
+ so we have to use carefully and this method will add a host if its a new host
+ */
+ public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+ this.hostMonitorData.add(hostMonitorData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+ private String experimentId;
+
+ public ExperimentCancelRequest(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+ private String experimentId;
+ private String nodeId;
+ private String taskId;
+
+ public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+ this.experimentId = experimentId;
+ this.setNodeId(nodeId);
+ this.taskId = taskId;
+ }
+ public String getExperimentId() {
+ return experimentId;
+ }
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..b4ac3a9
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect nto consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+ /**
+ * This method is to implement how to parse the incoming message
+ * and implement a logic to finalize the status of the job,
+ * we have to makesure the correct message is given to the messageparser
+ * parse method, it will not do any filtering
+ * @param message content of the message
+ * @return
+ */
+ JobState parseMessage(String message)throws AiravataMonitorException;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..614d606
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
new file mode 100644
index 0000000..efdf89c
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PullMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This will allow users to program Pull monitors separately
+ */
+public abstract class PullMonitor extends AiravataAbstractMonitor {
+
+ private int pollingFrequence;
+ /**
+ * This method will can invoke when PullMonitor needs to start
+ * and it has to invoke in the frequency specified below,
+ * @return if the start process is successful return true else false
+ */
+ public abstract boolean startPulling() throws AiravataMonitorException;
+
+ /**
+ * This is the method to stop the polling process
+ * @return if the stopping process is successful return true else false
+ */
+ public abstract boolean stopPulling()throws AiravataMonitorException;
+
+ /**
+ * this method can be used to set the polling frequencey or otherwise
+ * can implement a polling mechanism, and implement how to do
+ * @param frequence
+ */
+ public void setPollingFrequence(int frequence){
+ this.pollingFrequence = frequence;
+ }
+
+ /**
+ * this method can be used to get the polling frequencey or otherwise
+ * can implement a polling mechanism, and implement how to do
+ * @return
+ */
+ public int getPollingFrequence(){
+ return this.pollingFrequence;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
new file mode 100644
index 0000000..1b6a228
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PushMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This interface will allow users to program Push monitors separately
+ */
+public abstract class PushMonitor extends AiravataAbstractMonitor {
+ /**
+ * This method can be invoked to register a listener with the
+ * remote monitoring system, ideally inside this method users will be
+ * writing some client listener code for the remote monitoring system,
+ * this will be a simple wrapper around any client for the remote Monitor.
+ * @param monitorID
+ * @return
+ */
+ public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+
+ /**
+ * This method can be invoked to unregister a listener with the
+ * remote monitoring system, ideally inside this method users will be
+ * writing some client listener code for the remote monitoring system,
+ * this will be a simple wrapper around any client for the remote Monitor.
+ * @param monitorID
+ * @return
+ */
+ public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
+
+ /**
+ * This can be used to stop the registration thread
+ * @return
+ * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+ */
+ public abstract boolean stopRegister()throws AiravataMonitorException;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
new file mode 100644
index 0000000..eea6ef6
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.impl.OutHandlerWorker;
+import org.apache.airavata.gfac.monitor.email.parser.LSFEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.PBSEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.SLURMEmailParser;
+import org.apache.airavata.gfac.monitor.email.parser.UGEEmailParser;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+
+import javax.mail.Address;
+import javax.mail.Flags;
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Store;
+import javax.mail.search.FlagTerm;
+import javax.mail.search.SearchTerm;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EmailBasedMonitor implements Runnable{
+ private static final AiravataLogger log = AiravataLoggerFactory.getLogger(EmailBasedMonitor.class);
+
+ public static final int COMPARISON = 6; // after and equal
+ public static final String IMAPS = "imaps";
+ public static final String POP3 = "pop3";
+ private boolean stopMonitoring = false;
+
+ private Session session ;
+ private Store store;
+ private Folder emailFolder;
+ private Properties properties;
+ private Map<String, JobExecutionContext> jobMonitorMap = new ConcurrentHashMap<String, JobExecutionContext>();
+ private String host, emailAddress, password, storeProtocol, folderName ;
+ private Date monitorStartDate;
+ private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
+
+ public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException {
+ init();
+ }
+
+ private void init() throws AiravataException {
+ host = ServerSettings.getEmailBasedMonitorHost();
+ emailAddress = ServerSettings.getEmailBasedMonitorAddress();
+ password = ServerSettings.getEmailBasedMonitorPassword();
+ storeProtocol = ServerSettings.getEmailBasedMonitorStoreProtocol();
+ folderName = ServerSettings.getEmailBasedMonitorFolderName();
+ if (!(storeProtocol.equals(IMAPS) || storeProtocol.equals(POP3))) {
+ throw new AiravataException("Unsupported store protocol , expected " +
+ IMAPS + " or " + POP3 + " but found " + storeProtocol);
+ }
+ properties = new Properties();
+ properties.put("mail.store.protocol", storeProtocol);
+ }
+
+ public void addToJobMonitorMap(JobExecutionContext jobExecutionContext) {
+ String monitorId = jobExecutionContext.getJobDetails().getJobID();
+ if (monitorId == null || monitorId.isEmpty()) {
+ monitorId = jobExecutionContext.getJobDetails().getJobName();
+ }
+ addToJobMonitorMap(monitorId, jobExecutionContext);
+ }
+
+ public void addToJobMonitorMap(String monitorId, JobExecutionContext jobExecutionContext) {
+ log.info("[EJM]: Added monitor Id : " + monitorId + " to email based monitor map");
+ jobMonitorMap.put(monitorId, jobExecutionContext);
+ }
+
+ private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
+ Address fromAddress = message.getFrom()[0];
+ String addressStr = fromAddress.toString();
+ ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr);
+ EmailParser emailParser = emailParserMap.get(jobMonitorType);
+ if (emailParser == null) {
+ switch (jobMonitorType) {
+ case PBS:
+ emailParser = new PBSEmailParser();
+ break;
+ case SLURM:
+ emailParser = new SLURMEmailParser();
+ break;
+ case LSF:
+ emailParser = new LSFEmailParser();
+ break;
+ case UGE:
+ emailParser = new UGEEmailParser();
+ break;
+ default:
+ throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType.toString() + " for email monitoring --> " + addressStr);
+ }
+
+ emailParserMap.put(jobMonitorType, emailParser);
+ }
+ return emailParser.parseEmail(message);
+ }
+
+ private ResourceJobManagerType getJobMonitorType(String addressStr) throws AiravataException {
+ System.out.println("*********** address ******** : " + addressStr);
+ switch (addressStr) {
+ case "pbsconsult@sdsc.edu": // trestles , gordan
+ case "adm@trident.bigred2.uits.iu.edu": // bigred2
+ case "root <ad...@trident.bigred2.uits.iu.edu>": // bigred2
+ case "root <ad...@scyld.localdomain>": // alamo
+ return ResourceJobManagerType.PBS;
+ case "SDSC Admin <sl...@comet-fe3.sdsc.edu>": // comet
+ case "slurm@batch1.stampede.tacc.utexas.edu": // stampede
+ case "slurm user <sl...@tempest.dsc.soic.indiana.edu>":
+ return ResourceJobManagerType.SLURM;
+// case "lsf":
+// return ResourceJobManagerType.LSF;
+ default:
+ if (addressStr.contains("ls4.tacc.utexas.edu>")) { // lonestar
+ return ResourceJobManagerType.UGE;
+ } else {
+ throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr);
+ }
+ }
+
+ }
+
+ @Override
+ public void run() {
+ try {
+ session = Session.getDefaultInstance(properties);
+ store = session.getStore(storeProtocol);
+ store.connect(host, emailAddress, password);
+ emailFolder = store.getFolder(folderName);
+ // first time we search for all unread messages.
+ SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
+ while (!(stopMonitoring || ServerSettings.isStopAllThreads())) {
+ Thread.sleep(ServerSettings.getEmailMonitorPeriod());// sleep a bit - get a rest till job finishes
+ if (jobMonitorMap.isEmpty()) {
+ log.info("[EJM]: Job Monitor Map is empty, no need to retrieve emails");
+ continue;
+ } else {
+ log.info("[EJM]: " + jobMonitorMap.size() + " job/s in job monitor map");
+ }
+ if (!store.isConnected()) {
+ store.connect();
+ emailFolder = store.getFolder(folderName);
+ }
+ log.info("[EJM]: Retrieving unseen emails");
+ emailFolder.open(Folder.READ_WRITE);
+ Message[] searchMessages = emailFolder.search(unseenBefore);
+ if (searchMessages == null || searchMessages.length == 0) {
+ log.info("[EJM]: No new email messages");
+ } else {
+ log.info("[EJM]: "+searchMessages.length + " new email/s received");
+ }
+ processMessages(searchMessages);
+ emailFolder.close(false);
+ }
+ } catch (MessagingException e) {
+ log.error("[EJM]: Couldn't connect to the store ", e);
+ } catch (InterruptedException e) {
+ log.error("[EJM]: Interrupt exception while sleep ", e);
+ } catch (AiravataException e) {
+ log.error("[EJM]: UnHandled arguments ", e);
+ } finally {
+ try {
+ emailFolder.close(false);
+ store.close();
+ } catch (MessagingException e) {
+ log.error("[EJM]: Store close operation failed, couldn't close store", e);
+ }
+ }
+ }
+
+ private void processMessages(Message[] searchMessages) throws MessagingException {
+ List<Message> processedMessages = new ArrayList<>();
+ List<Message> unreadMessages = new ArrayList<>();
+ for (Message message : searchMessages) {
+ try {
+ JobStatusResult jobStatusResult = parse(message);
+ JobExecutionContext jEC = jobMonitorMap.get(jobStatusResult.getJobId());
+ if (jEC == null) {
+ jEC = jobMonitorMap.get(jobStatusResult.getJobName());
+ }
+ if (jEC != null) {
+ process(jobStatusResult, jEC);
+ processedMessages.add(message);
+ } else {
+ // we can get JobExecutionContext null in multiple Gfac instances environment,
+ // where this job is not submitted by this Gfac instance hence we ignore this message.
+ unreadMessages.add(message);
+// log.info("JobExecutionContext is not found for job Id " + jobStatusResult.getJobId());
+ }
+ } catch (AiravataException e) {
+ log.error("[EJM]: Error parsing email message =====================================>", e);
+ try {
+ writeEnvelopeOnError(message);
+ } catch (MessagingException e1) {
+ log.error("[EJM]: Error printing envelop of the email");
+ }
+ unreadMessages.add(message);
+ } catch (MessagingException e) {
+ log.error("[EJM]: Error while retrieving sender address from message : " + message.toString());
+ unreadMessages.add(message);
+ }
+ }
+ if (!processedMessages.isEmpty()) {
+ Message[] seenMessages = new Message[processedMessages.size()];
+ processedMessages.toArray(seenMessages);
+ try {
+ emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+ } catch (MessagingException e) {
+ if (!store.isConnected()) {
+ store.connect();
+ emailFolder.setFlags(seenMessages, new Flags(Flags.Flag.SEEN), true);
+ }
+ }
+
+ }
+ if (!unreadMessages.isEmpty()) {
+ Message[] unseenMessages = new Message[unreadMessages.size()];
+ unreadMessages.toArray(unseenMessages);
+ try {
+ emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+ } catch (MessagingException e) {
+ if (!store.isConnected()) {
+ store.connect();
+ emailFolder.setFlags(unseenMessages, new Flags(Flags.Flag.SEEN), false);
+
+ }
+ }
+ }
+ }
+
+ private void process(JobStatusResult jobStatusResult, JobExecutionContext jEC){
+ JobState resultState = jobStatusResult.getState();
+ jEC.getJobDetails().setJobStatus(new JobStatus(resultState));
+ boolean runOutHandlers = false;
+ String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
+ // TODO - Handle all other valid JobStates
+ if (resultState == JobState.COMPLETE) {
+ jobMonitorMap.remove(jobStatusResult.getJobId());
+ runOutHandlers = true;
+ log.info("[EJM]: Job Complete email received , removed job from job monitoring. " + jobDetails);
+ }else if (resultState == JobState.QUEUED) {
+ // nothing special thing to do, update the status change to rabbit mq at the end of this method.
+ log.info("[EJM]: Job Queued email received, " + jobDetails);
+ }else if (resultState == JobState.ACTIVE) {
+ // nothing special thing to do, update the status change to rabbit mq at the end of this method.
+ log.info("[EJM]: Job Active email received, " + jobDetails);
+ }else if (resultState == JobState.FAILED) {
+ jobMonitorMap.remove(jobStatusResult.getJobId());
+ runOutHandlers = true;
+ log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails);
+ }else if (resultState == JobState.CANCELED) {
+ jobMonitorMap.remove(jobStatusResult.getJobId());
+ runOutHandlers = false; // Do we need to run out handlers in canceled case?
+ log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails);
+
+ }
+ log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
+ publishJobStatusChange(jEC);
+
+ if (runOutHandlers) {
+ log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
+ GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(jEC));
+ }
+ }
+
+ private void publishJobStatusChange(JobExecutionContext jobExecutionContext) {
+ JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+ JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
+ jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ jobStatus.setJobIdentity(jobIdentity);
+ jobStatus.setState(jobExecutionContext.getJobDetails().getJobStatus().getJobState());
+ // we have this JobStatus class to handle amqp monitoring
+ log.debugId(jobStatus.getJobIdentity().getJobId(), "[EJM]: Published job status(" +
+ jobExecutionContext.getJobDetails().getJobStatus().getJobState().toString() + ") change request, " +
+ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+ jobStatus.getJobIdentity().getTaskId());
+
+ jobExecutionContext.getMonitorPublisher().publish(jobStatus);
+ }
+
+ private void writeEnvelopeOnError(Message m) throws MessagingException {
+ Address[] a;
+ // FROM
+ if ((a = m.getFrom()) != null) {
+ for (int j = 0; j < a.length; j++)
+ log.error("FROM: " + a[j].toString());
+ }
+ // TO
+ if ((a = m.getRecipients(Message.RecipientType.TO)) != null) {
+ for (int j = 0; j < a.length; j++)
+ log.error("TO: " + a[j].toString());
+ }
+ // SUBJECT
+ if (m.getSubject() != null)
+ log.error("SUBJECT: " + m.getSubject());
+ }
+
+ public void stopMonitoring() {
+ stopMonitoring = true;
+ }
+
+ public void setDate(Date date) {
+ this.monitorStartDate = date;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
new file mode 100644
index 0000000..3a75331
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailMonitorFactory.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EmailMonitorFactory {
+
+ private static EmailBasedMonitor emailBasedMonitor;
+ private static Date startMonitorDate = Calendar.getInstance().getTime();
+
+ public static EmailBasedMonitor getEmailBasedMonitor(ResourceJobManagerType resourceJobManagerType) throws AiravataException {
+ if (emailBasedMonitor == null) {
+ synchronized (EmailMonitorFactory.class){
+ if (emailBasedMonitor == null) {
+ emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType);
+ emailBasedMonitor.setDate(startMonitorDate);
+ new Thread(emailBasedMonitor).start();
+ }
+ }
+ }
+ return emailBasedMonitor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
new file mode 100644
index 0000000..1b5a027
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/LSFEmailParser.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LSFEmailParser implements EmailParser {
+ private static final Logger log = LoggerFactory.getLogger(LSFEmailParser.class);
+ //root@c312-206.ls4.tacc.utexas.edu
+ private static final String SIGNAL = "signal";
+ private static final String LONESTAR_REGEX = "Job (?<" + JOBID + ">\\d+) \\(.*\\) (?<" + STATUS
+ + ">.*)\\s[a-zA-Z =]+(?<" + EXIT_STATUS + ">\\d+)\\sSignal[ ]*=[ ]*(?<" + SIGNAL + ">[a-zA-z]*)";
+
+ @Override
+ public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
+ JobStatusResult jobStatusResult = new JobStatusResult();
+ try {
+ String content = ((String) message.getContent());
+ Pattern pattern = Pattern.compile(LONESTAR_REGEX);
+ Matcher matcher = pattern.matcher(content);
+ if (matcher.find()) {
+ jobStatusResult.setJobId(matcher.group(JOBID));
+ String status = matcher.group(STATUS);
+ jobStatusResult.setState(getJobState(status, content));
+ return jobStatusResult;
+ } else {
+ log.error("[EJM]: No matched found for content => \n" + content);
+ }
+ } catch (IOException e) {
+ throw new AiravataException("i[EJM]: Error while reading content of the email message");
+ }
+ return jobStatusResult;
+ }
+
+ private JobState getJobState(String status, String content) {
+ switch (status) {
+ case "Aborted":
+ return JobState.FAILED;
+ case "Success":
+ return JobState.COMPLETE;
+ default:
+ return JobState.UNKNOWN;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
new file mode 100644
index 0000000..4a3c88b
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/PBSEmailParser.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PBSEmailParser implements EmailParser {
+
+ private static final Logger log = LoggerFactory.getLogger(PBSEmailParser.class);
+
+
+ private static final String REGEX = "[a-zA-Z ]*:[ ]*(?<" + JOBID + ">[a-zA-Z0-9-\\.]*)\\s+[a-zA-Z ]*:[ ]*(?<"+
+ JOBNAME + ">[a-zA-Z0-9-\\.]*)\\s+.*\\s+(?<" + STATUS + ">[a-zA-Z\\ ]*)";
+ private static final String REGEX_EXIT_STATUS = "Exit_status=(?<" + EXIT_STATUS + ">[\\d]+)";
+ public static final String BEGUN_EXECUTION = "Begun execution";
+ public static final String EXECUTION_TERMINATED = "Execution terminated";
+ public static final String ABORTED_BY_PBS_SERVER = "Aborted by PBS Server";
+
+ @Override
+ public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException {
+ JobStatusResult jobStatusResult = new JobStatusResult();
+// log.info("Parsing -> " + message.getSubject());
+ try {
+ String content = ((String) message.getContent());
+ Pattern pattern = Pattern.compile(REGEX);
+ Matcher matcher = pattern.matcher(content);
+ if (matcher.find()) {
+ jobStatusResult.setJobId(matcher.group(JOBID));
+ jobStatusResult.setJobName(matcher.group(JOBNAME));
+ String statusLine = matcher.group(STATUS);
+ jobStatusResult.setState(getJobState(statusLine, content));
+ return jobStatusResult;
+ } else {
+ log.error("[EJM]: No matched found for content => \n" + content);
+ }
+
+ } catch (IOException e) {
+ throw new AiravataException("[EJM]: Error while reading content of the email message");
+ }
+ return jobStatusResult;
+ }
+
+ private JobState getJobState(String statusLine, String content) {
+ switch (statusLine) {
+ case BEGUN_EXECUTION:
+ return JobState.ACTIVE;
+ case EXECUTION_TERMINATED:
+ int exitStatus = getExitStatus(content);
+ if (exitStatus == 0) {
+ // TODO - Remove rabbitmq client script line from the script.
+ return JobState.COMPLETE;
+ } else if (exitStatus == 271) {
+ return JobState.CANCELED;
+ } else {
+ return JobState.FAILED;
+ }
+ case ABORTED_BY_PBS_SERVER:
+ return JobState.FAILED;
+ default:
+ return JobState.UNKNOWN;
+ }
+ }
+
+ private int getExitStatus(String content) {
+ Pattern pattern = Pattern.compile(REGEX_EXIT_STATUS);
+ Matcher matcher = pattern.matcher(content);
+ if (matcher.find()) {
+ String group = matcher.group(EXIT_STATUS);
+ if (group != null && !group.trim().isEmpty()) {
+ return Integer.valueOf(group.trim());
+ }
+ }
+ return -1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
new file mode 100644
index 0000000..9dd32c0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/parser/SLURMEmailParser.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.email.parser;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.gfac.core.monitor.EmailParser;
+import org.apache.airavata.gfac.core.monitor.JobStatusResult;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SLURMEmailParser implements EmailParser {
+
+ private static final Logger log = LoggerFactory.getLogger(SLURMEmailParser.class);
+
+ private static final String REGEX = "[A-Z]*\\s[a-zA-Z]*_[a-z]*=(?<" + JOBID + ">\\d*)[ ]*[a-zA-Z]*=(?<"+
+ JOBNAME + ">[a-zA-Z0-9-]*)[ ]*(?<" + STATUS + ">[]a-zA-Z]*),.*";
+
+ public static final String BEGAN = "Began";
+ public static final String ENDED = "Ended";
+ public static final String FAILED = "Failed";
+ private static final Pattern cancelledStatePattern = Pattern.compile("CANCELLED");
+ private static final Pattern pattern = Pattern.compile(REGEX);
+
+ @Override
+ public JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException{
+ JobStatusResult jobStatusResult = new JobStatusResult();
+ String subject = message.getSubject();
+ Matcher matcher = pattern.matcher(subject);
+ if (matcher.find()) {
+ jobStatusResult.setJobId(matcher.group(JOBID));
+ jobStatusResult.setJobName(matcher.group(JOBNAME));
+ jobStatusResult.setState(getJobState(matcher.group(STATUS), subject));
+ return jobStatusResult;
+ } else {
+ log.error("[EJM]: No matched found for subject -> " + subject);
+ }
+ return jobStatusResult;
+ }
+
+ private JobState getJobState(String state, String subject) {
+ switch (state.trim()) {
+ case BEGAN:
+ return JobState.ACTIVE;
+ case ENDED:
+ Matcher matcher = cancelledStatePattern.matcher(subject);
+ if (matcher.find()) {
+ return JobState.CANCELED;
+ }
+ return JobState.COMPLETE;
+ case FAILED:
+ return JobState.FAILED;
+ default:
+ log.error("[EJM]: Job State " + state + " isn't handle by SLURM parser");
+ return JobState.UNKNOWN;
+
+ }
+ }
+
+}