You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/01 20:29:22 UTC
[4/9] Separating gfac-monitoring implementation
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
new file mode 100644
index 0000000..a64b484
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This monitor can be used to monitor a job which runs locally,
+ * Since its a local job job doesn't have states, once it get executed
+ * then the job starts running
+ */
+public class LocalJobMonitor extends AiravataAbstractMonitor {
+ // Though we have a qeuue here, it not going to be used in local jobs
+ BlockingQueue<MonitorID> jobQueue;
+
+ public void run() {
+ do {
+ try {
+ MonitorID take = jobQueue.take();
+ getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } while (!ServerSettings.isStopAllThreads());
+ }
+
+ public BlockingQueue<MonitorID> getJobQueue() {
+ return jobQueue;
+ }
+
+ public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
+ this.jobQueue = jobQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
new file mode 100644
index 0000000..edd6ce0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -0,0 +1,284 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * This monitor is based on qstat command which can be run
+ * in grid resources and retrieve the job status.
+ */
+public class HPCPullMonitor extends PullMonitor {
+ private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class);
+
+ // I think this should use DelayedBlocking Queue to do the monitoring*/
+ private BlockingQueue<UserMonitorData> queue;
+
+ private boolean startPulling = false;
+
+ private Map<String, ResourceConnection> connections;
+
+ private MonitorPublisher publisher;
+
+ public HPCPullMonitor(){
+ connections = new HashMap<String, ResourceConnection>();
+ this.queue = new LinkedBlockingDeque<UserMonitorData>();
+ publisher = new MonitorPublisher(new EventBus());
+ }
+ public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+ this.queue = queue;
+ this.publisher = publisher;
+ connections = new HashMap<String, ResourceConnection>();
+ }
+
+
+
+ public void run() {
+ /* implement a logic to pick each monitorID object from the queue and do the
+ monitoring
+ */
+ this.startPulling = true;
+ while (this.startPulling && !ServerSettings.isStopAllThreads()) {
+ try {
+ startPulling();
+ // After finishing one iteration of the full queue this thread sleeps 1 second
+ Thread.sleep(10000);
+ } catch (Exception e){
+ // we catch all the exceptions here because no matter what happens we do not stop running this
+ // thread, but ideally we should report proper error messages, but this is handled in startPulling
+ // method, incase something happen in Thread.sleep we handle it with this catch block.
+ e.printStackTrace();
+ logger.error(e.getMessage());
+ }
+ }
+ // thread is going to return so we close all the connections
+ Iterator<String> iterator = connections.keySet().iterator();
+ while(iterator.hasNext()){
+ String next = iterator.next();
+ ResourceConnection resourceConnection = connections.get(next);
+ try {
+ resourceConnection.getCluster().disconnect();
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ /**
+ * This method will can invoke when PullMonitor needs to start
+ * and it has to invoke in the frequency specified below,
+ *
+ * @return if the start process is successful return true else false
+ */
+ public boolean startPulling() throws AiravataMonitorException {
+ // take the top element in the queue and pull the data and put that element
+ // at the tail of the queue
+ //todo this polling will not work with multiple usernames but with single user
+ // and multiple hosts, currently monitoring will work
+ UserMonitorData take = null;
+ JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
+ MonitorID currentMonitorID = null;
+ HostDescription currentHostDescription = null;
+ try {
+ take = this.queue.take();
+ List<MonitorID> completedJobs = new ArrayList<MonitorID>();
+ List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+ for (HostMonitorData iHostMonitorData : hostMonitorData) {
+ if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
+ currentHostDescription = iHostMonitorData.getHost();
+ GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
+ String hostName = gsisshHostType.getHostAddress();
+ ResourceConnection connection = null;
+ if (connections.containsKey(hostName)) {
+ logger.debug("We already have this connection so not going to create one");
+ connection = connections.get(hostName);
+ } else {
+ connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
+ connections.put(hostName, connection);
+ }
+ List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+ Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
+ for (MonitorID iMonitorID : monitorID) {
+ currentMonitorID = iMonitorID;
+ iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
+ jobStatus = new JobStatusChangeRequest(iMonitorID);
+ // we have this JobStatus class to handle amqp monitoring
+
+ publisher.publish(jobStatus);
+ // if the job is completed we do not have to put the job to the queue again
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+
+ // After successful monitoring perform following actions to cleanup the queue, if necessary
+ if (jobStatus.getState().equals(JobState.COMPLETE)) {
+ completedJobs.add(iMonitorID);
+ } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {
+ logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ completedJobs.add(iMonitorID);
+ } else {
+ // Evey
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ // if the job is complete we remove it from the Map, if any of these maps
+ // get empty this userMonitorData will get delete from the queue
+ }
+ }
+ } else {
+ logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
+ }
+ }
+ // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+ // now the userMonitorData goes back to the tail of the queue
+ queue.put(take);
+ // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
+ // they become empty
+ for(MonitorID completedJob:completedJobs){
+ CommonUtils.removeMonitorFromQueue(queue, completedJob);
+ }
+ } catch (InterruptedException e) {
+ if (!this.queue.contains(take)) {
+ try {
+ this.queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+ throw new AiravataMonitorException(e);
+ } catch (SSHApiException e) {
+ logger.error(e.getMessage());
+ if (e.getMessage().contains("Unknown Job Id Error")) {
+ // in this case job is finished or may be the given job ID is wrong
+ jobStatus.setState(JobState.UNKNOWN);
+ publisher.publish(jobStatus);
+ } else if (e.getMessage().contains("illegally formed job identifier")) {
+ logger.error("Wrong job ID is given so dropping the job from monitoring system");
+ } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown
+ if (currentMonitorID == null) {
+ logger.error("Monitoring the jobs failed, for user: " + take.getUserName()
+ + " in Host: " + currentHostDescription.getType().getHostAddress());
+ } else {
+ if (currentMonitorID != null) {
+ if (currentMonitorID.getFailedCount() < 2) {
+ try {
+ currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+ this.queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ logger.error(e.getMessage());
+ logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+ }
+ }
+ }
+ }
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ } catch (Exception e) {
+ if (currentMonitorID != null) {
+ if (currentMonitorID.getFailedCount() < 3) {
+ try {
+ currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+ this.queue.put(take);
+ // if we get a wrong status we wait for a while and request again
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ logger.error(e.getMessage());
+ logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+ }
+ }
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ }
+
+
+ return true;
+ }
+
+
+ /**
+ * This is the method to stop the polling process
+ *
+ * @return if the stopping process is successful return true else false
+ */
+ public boolean stopPulling() {
+ this.startPulling = false;
+ return true;
+ }
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public BlockingQueue<UserMonitorData> getQueue() {
+ return queue;
+ }
+
+ public void setQueue(BlockingQueue<UserMonitorData> queue) {
+ this.queue = queue;
+ }
+
+ public boolean authenticate() {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, ResourceConnection> getConnections() {
+ return connections;
+ }
+
+ public boolean isStartPulling() {
+ return startPulling;
+ }
+
+ public void setConnections(Map<String, ResourceConnection> connections) {
+ this.connections = connections;
+ }
+
+ public void setStartPulling(boolean startPulling) {
+ this.startPulling = startPulling;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
new file mode 100644
index 0000000..7a37b88
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class ResourceConnection {
+ private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
+
+ private PBSCluster cluster;
+
+ public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
+ AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
+ String hostAddress = monitorID.getHost().getType().getHostAddress();
+ String userName = monitorID.getUserName();
+ String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager();
+ JobManagerConfiguration jConfig = null;
+ if (jobManager == null) {
+ log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else {
+ if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) {
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) {
+ jConfig = CommonUtils.getSLURMJobManager(installedPath);
+ } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(monitorID.getHost())) {
+ jConfig = CommonUtils.getSGEJobManager(installedPath);
+ }
+ //todo support br2 etc
+ }
+ ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort());
+ cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+ }
+
+ public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
+ AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
+ String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
+ String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
+ JobManagerConfiguration jConfig = null;
+ if (jobManager == null) {
+ log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else {
+ if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
+ jConfig = CommonUtils.getSLURMJobManager(installedPath);
+ }else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) {
+ jConfig = CommonUtils.getSGEJobManager(installedPath);
+ }
+ //todo support br2 etc
+ }
+ ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
+ cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+ }
+ public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
+ String jobID = monitorID.getJobID();
+ //todo so currently we execute the qstat for each job but we can use user based monitoring
+ //todo or we should concatenate all the commands and execute them in one go and parse the response
+ return getStatusFromString(cluster.getJobStatus(jobID).toString());
+ }
+
+ public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
+ Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
+ Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
+ // creating a sorted map with all the jobIds and with the predefined
+ // status as UNKNOWN
+ for (MonitorID monitorID : monitorIDs) {
+ treeMap.put(monitorID.getJobID(), JobStatus.U);
+ }
+ //todo so currently we execute the qstat for each job but we can use user based monitoring
+ //todo or we should concatenate all the commands and execute them in one go and parse the response
+ cluster.getJobStatuses(userName,treeMap);
+ for(String key:treeMap.keySet()){
+ treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
+ }
+ return treeMap1;
+ }
+ private JobState getStatusFromString(String status) {
+ log.info("parsing the job status returned : " + status);
+ if(status != null){
+ if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){
+ return JobState.COMPLETE;
+ }else if("H".equals(status) || "h".equals(status)){
+ return JobState.HELD;
+ }else if("Q".equals(status) || "qw".equals(status)){
+ return JobState.QUEUED;
+ }else if("R".equals(status) || "CF".equals(status) || "r".equals(status)){
+ return JobState.ACTIVE;
+ }else if ("T".equals(status)) {
+ return JobState.HELD;
+ } else if ("W".equals(status) || "PD".equals(status)) {
+ return JobState.QUEUED;
+ } else if ("S".equals(status)) {
+ return JobState.SUSPENDED;
+ }else if("CA".equals(status)){
+ return JobState.CANCELED;
+ }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) {
+ return JobState.FAILED;
+ }else if ("PR".equals(status) || "Er".equals(status)) {
+ return JobState.FAILED;
+ }else if ("U".equals(status)){
+ return JobState.UNKNOWN;
+ }
+ }
+ return JobState.UNKNOWN;
+ }
+
+ public PBSCluster getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(PBSCluster cluster) {
+ this.cluster = cluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
new file mode 100644
index 0000000..fbf6e21
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This is the implementation for AMQP based finishQueue, this uses
+ * rabbitmq client to recieve AMQP based monitoring data from
+ * mostly excede resources.
+ */
+public class AMQPMonitor extends PushMonitor {
+ private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+
+ /* this will keep all the channels available in the system, we do not create
+ channels for all the jobs submitted, but we create channels for each user for each
+ host.
+ */
+ private Map<String, Channel> availableChannels;
+
+ private MonitorPublisher publisher;
+
+ private MonitorPublisher localPublisher;
+
+ private BlockingQueue<MonitorID> runningQueue;
+
+ private BlockingQueue<MonitorID> finishQueue;
+
+ private String connectionName;
+
+ private String proxyPath;
+
+ private List<String> amqpHosts;
+
+ private boolean startRegister;
+
+ public AMQPMonitor(){
+
+ }
+ public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+ BlockingQueue<MonitorID> finishQueue,
+ String proxyPath,String connectionName,List<String> hosts) {
+ this.publisher = publisher;
+ this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
+ this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
+ this.availableChannels = new HashMap<String, Channel>();
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
+ this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher.registerListener(this);
+ }
+
+ public void initialize(String proxyPath, String connectionName, List<String> hosts) {
+ this.availableChannels = new HashMap<String, Channel>();
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
+ this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher.registerListener(this);
+ }
+
+ @Override
+ public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
+ // we subscribe to read user-host based subscription
+ HostDescription host = monitorID.getHost();
+ String hostAddress = host.getType().getHostAddress();
+ // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+ // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+ String channelID = CommonUtils.getChannelID(monitorID);
+ if(availableChannels.get(channelID) == null){
+ try {
+ //todo need to fix this rather getting it from a file
+ Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+ Channel channel = null;
+ channel = connection.createChannel();
+ availableChannels.put(channelID, channel);
+ String queueName = channel.queueDeclare().getQueue();
+
+ BasicConsumer consumer = new
+ BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
+ channel.basicConsume(queueName, true, consumer);
+ String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+ // here we queuebind to a particular user in a particular machine
+ channel.queueBind(queueName, "glue2.computing_activity", filterString);
+ logger.info("Using filtering string to monitor: " + filterString);
+ } catch (IOException e) {
+ logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+ }
+ }
+ return true;
+ }
+
+ public void run() {
+ // before going to the while true mode we start unregister thread
+ startRegister = true; // this will be unset by someone else
+ while (startRegister || !ServerSettings.isStopAllThreads()) {
+ try {
+ MonitorID take = runningQueue.take();
+ this.registerListener(take);
+ } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+ Set<String> strings = availableChannels.keySet();
+ for(String key:strings) {
+ Channel channel = availableChannels.get(key);
+ try {
+ channel.close();
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ @Subscribe
+ public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<MonitorID> iterator = finishQueue.iterator();
+ MonitorID next = null;
+ while(iterator.hasNext()){
+ next = iterator.next();
+ if(next.getJobID().endsWith(monitorID.getJobID())){
+ break;
+ }
+ }
+ if(next == null) {
+ logger.error("Job has removed from the queue, old obsolete message recieved");
+ return false;
+ }
+ String channelID = CommonUtils.getChannelID(next);
+ if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
+ finishQueue.remove(next);
+
+ // if this is the last job in the queue at this point with the same username and same host we
+ // close the channel and close the connection and remove it from availableChannels
+ if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
+ logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
+ ", incase new job created we do subscribe again");
+ Channel channel = availableChannels.get(channelID);
+ if (channel == null) {
+ logger.error("Already Unregistered the listener");
+ throw new AiravataMonitorException("Already Unregistered the listener");
+ } else {
+ try {
+ channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
+ channel.close();
+ channel.getConnection().close();
+ availableChannels.remove(channelID);
+ } catch (IOException e) {
+ logger.error("Error unregistering the listener");
+ throw new AiravataMonitorException("Error unregistering the listener");
+ }
+ }
+ }
+ }
+ next.setStatus(monitorID.getStatus());
+ publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
+ return true;
+ }
+ @Override
+ public boolean stopRegister() throws AiravataMonitorException {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Channel> getAvailableChannels() {
+ return availableChannels;
+ }
+
+ public void setAvailableChannels(Map<String, Channel> availableChannels) {
+ this.availableChannels = availableChannels;
+ }
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public BlockingQueue<MonitorID> getRunningQueue() {
+ return runningQueue;
+ }
+
+ public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+ this.runningQueue = runningQueue;
+ }
+
+ public BlockingQueue<MonitorID> getFinishQueue() {
+ return finishQueue;
+ }
+
+ public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+ this.finishQueue = finishQueue;
+ }
+
+ public String getProxyPath() {
+ return proxyPath;
+ }
+
+ public void setProxyPath(String proxyPath) {
+ this.proxyPath = proxyPath;
+ }
+
+ public List<String> getAmqpHosts() {
+ return amqpHosts;
+ }
+
+ public void setAmqpHosts(List<String> amqpHosts) {
+ this.amqpHosts = amqpHosts;
+ }
+
+ public boolean isStartRegister() {
+ return startRegister;
+ }
+
+ public void setStartRegister(boolean startRegister) {
+ this.startRegister = startRegister;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
new file mode 100644
index 0000000..1d60c45
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BasicConsumer implements Consumer {
+ private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+ private MessageParser parser;
+
+ private MonitorPublisher publisher;
+
+ public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+ this.parser = parser;
+ this.publisher = publisher;
+ }
+
+ public void handleCancel(String consumerTag) {
+ }
+
+ public void handleCancelOk(String consumerTag) {
+ }
+
+ public void handleConsumeOk(String consumerTag) {
+ }
+
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+
+ logger.debug("job update for: " + envelope.getRoutingKey());
+ String message = new String(body);
+ message = message.replaceAll("(?m)^", " ");
+ // Here we parse the message and get the job status and push it
+ // to the Event bus, this will be picked by
+// AiravataJobStatusUpdator and store in to registry
+
+ logger.debug("************************************************************");
+ logger.debug("AMQP Message recieved \n" + message);
+ logger.debug("************************************************************");
+ try {
+ String jobID = envelope.getRoutingKey().split("\\.")[0];
+ MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
+ monitorID.setStatus(parser.parseMessage(message));
+ publisher.publish(monitorID);
+ } catch (AiravataMonitorException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void handleRecoverOk(String consumerTag) {
+ }
+
+ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
new file mode 100644
index 0000000..72c77d5
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.airavata.ComputingActivity;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class JSONMessageParser implements MessageParser {
+ private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
+
+ public JobState parseMessage(String message)throws AiravataMonitorException {
+ /*todo write a json message parser here*/
+ logger.debug(message);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
+ logger.info(computingActivity.getIDFromEndpoint());
+ List<String> stateList = computingActivity.getState();
+ JobState jobState = null;
+ for (String aState : stateList) {
+ jobState = getStatusFromString(aState);
+ }
+ // we get the last value of the state array
+ return jobState;
+ } catch (IOException e) {
+ throw new AiravataMonitorException(e);
+ }
+ }
+
+private JobState getStatusFromString(String status) {
+ logger.info("parsing the job status returned : " + status);
+ if(status != null){
+ if("ipf:finished".equals(status)){
+ return JobState.COMPLETE;
+ }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
+ return JobState.QUEUED;
+ }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
+ return JobState.ACTIVE;
+ }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
+ return JobState.HELD;
+ } else if ("ipf:suspending".equals(status)) {
+ return JobState.SUSPENDED;
+ }else if ("ipf:failed".equals(status)) {
+ return JobState.FAILED;
+ }else if ("ipf:unknown".equals(status)){
+ return JobState.UNKNOWN;
+ }
+ }
+ return JobState.UNKNOWN;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
new file mode 100644
index 0000000..c6e1378
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class UnRegisterWorker{
+ private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
+ private Map<String, Channel> availableChannels;
+
+ public UnRegisterWorker(Map<String, Channel> channels) {
+ this.availableChannels = channels;
+ }
+
+ @Subscribe
+ private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
+ MonitorID monitorID = jobStatus.getMonitorID();
+ String channelID = CommonUtils.getChannelID(monitorID);
+ if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
+ Channel channel = availableChannels.get(channelID);
+ if (channel == null) {
+ logger.error("Already Unregistered the listener");
+ throw new AiravataMonitorException("Already Unregistered the listener");
+ } else {
+ try {
+ channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+ channel.close();
+ channel.getConnection().close();
+ availableChannels.remove(channelID);
+ } catch (IOException e) {
+ logger.error("Error unregistering the listener");
+ throw new AiravataMonitorException("Error unregistering the listener");
+ }
+ }
+ }
+ return true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
new file mode 100644
index 0000000..10048b0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.state;
+
+
+public abstract class AbstractStateChangeRequest implements PublisherMessage {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
new file mode 100644
index 0000000..eecf88d
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.ExperimentIdentity;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest {
+ private ExperimentState state;
+ private ExperimentIdentity identity;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public ExperimentStatusChangeRequest() {
+ }
+
+ public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
+ this.state = state;
+ setIdentity(experimentIdentity);
+ }
+
+ public ExperimentState getState() {
+ return state;
+ }
+
+ public void setState(ExperimentState state) {
+ this.state = state;
+ }
+
+ public ExperimentIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(ExperimentIdentity identity) {
+ this.identity = identity;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
new file mode 100644
index 0000000..da52656
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class JobStatusChangeRequest extends AbstractStateChangeRequest {
+ private JobState state;
+ private JobIdentity identity;
+
+ private MonitorID monitorID;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public JobStatusChangeRequest() {
+ }
+
+ public JobStatusChangeRequest(MonitorID monitorID) {
+ setIdentity(new JobIdentity(monitorID.getExperimentID(),monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID(),monitorID.getJobID()));
+ setMonitorID(monitorID);
+ this.state = monitorID.getStatus();
+ }
+ public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
+ setIdentity(jobId);
+ setMonitorID(monitorID);
+ this.state = state;
+ }
+
+ public JobState getState() {
+ return state;
+ }
+
+ public void setState(JobState state) {
+ this.state = state;
+ }
+
+ public JobIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(JobIdentity identity) {
+ this.identity = identity;
+ }
+
+ public MonitorID getMonitorID() {
+ return monitorID;
+ }
+
+ public void setMonitorID(MonitorID monitorID) {
+ this.monitorID = monitorID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
new file mode 100644
index 0000000..9a59b50
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+/**
+ * Based on the job status monitoring we can gather
+ * different informaation about the job, its not simply
+ * the job status, so we need a way to implement
+ * different job statusinfo object to keep job status
+ */
+public interface JobStatusInfo {
+
+ /**
+ * This method can be used to get JobStatusInfo data and
+ * decide the finalJobState
+ *
+ * @param jobState
+ */
+ void setJobStatus(JobStatus jobState);
+
+ /**
+ * After setting the jobState by processing jobinformation
+ * this method can be used to get the JobStatus
+ * @return
+ */
+ JobStatus getJobStatus();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
new file mode 100644
index 0000000..cbfcb5a
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.state;
+
+public interface PublisherMessage {
+// public String getType();
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
new file mode 100644
index 0000000..af20707
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.TaskIdentity;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class TaskStatusChangeRequest extends AbstractStateChangeRequest {
+ private TaskState state;
+ private TaskIdentity identity;
+ // this constructor can be used in Qstat monitor to handle errors
+ public TaskStatusChangeRequest() {
+ }
+
+ public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) {
+ this.state = state;
+ setIdentity(taskIdentity);
+ }
+
+ public TaskState getState() {
+ return state;
+ }
+
+ public void setState(TaskState state) {
+ this.state = state;
+ }
+
+ public TaskIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(TaskIdentity identity) {
+ this.identity = identity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
new file mode 100644
index 0000000..632f2e3
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.WorkflowNodeIdentity;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest {
+ private WorkflowNodeState state;
+ private WorkflowNodeIdentity identity;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public WorkflowNodeStatusChangeRequest() {
+ }
+
+ public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
+ this.state = state;
+ setIdentity(identity);
+ }
+
+ public WorkflowNodeState getState() {
+ return state;
+ }
+
+ public void setState(WorkflowNodeState state) {
+ this.state = state;
+ }
+
+ public WorkflowNodeIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(WorkflowNodeIdentity identity) {
+ this.identity = identity;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
new file mode 100644
index 0000000..19b051a
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state.impl;
+
+import org.apache.airavata.gfac.monitor.state.JobStatusInfo;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+/**
+ * This can be used to store job status information about
+ * amazon jobs, this data could be very different from
+ * a typical grid job
+ */
+public class AmazonJobStatusInfo implements JobStatusInfo {
+ public void setJobStatus(JobStatus jobState) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public JobStatus getJobStatus() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
new file mode 100644
index 0000000..4612c3c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state.impl;
+
+import org.apache.airavata.gfac.monitor.state.JobStatusInfo;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+
+/**
+ * This can be used to keep information about a Grid job
+ * which we can get from qstat polling or from amqp based
+ * monitoring in Grid machines
+ */
+public class GridJobStatusInfo implements JobStatusInfo {
+ public void setJobStatus(JobStatus jobState) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public JobStatus getJobStatus() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
new file mode 100644
index 0000000..b69cf52
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultSaslConfig;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+
+public class AMQPConnectionUtil {
+ public static Connection connect(List<String>hosts,String vhost, String proxyFile) {
+ Collections.shuffle(hosts);
+ for (String host : hosts) {
+ Connection connection = connect(host, vhost, proxyFile);
+ if (host != null) {
+ System.out.println("connected to " + host);
+ return connection;
+ }
+ }
+ return null;
+ }
+
+ public static Connection connect(String host, String vhost, String proxyFile) {
+ Connection connection;
+ try {
+ String keyPassPhrase = "test123";
+ KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, keyPassPhrase.toCharArray());
+
+ KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+ tmf.init(tks);
+
+ SSLContext c = SSLContext.getInstance("SSLv3");
+ c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(host);
+ factory.setPort(5671);
+ factory.useSslProtocol(c);
+ factory.setVirtualHost(vhost);
+ factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
+
+ connection = factory.newConnection();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ return connection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..30f1ae4
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public class CommonUtils {
+ private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class);
+
+ public static boolean isPBSHost(HostDescription host){
+ if("pbs".equals(((GsisshHostType)host.getType()).getJobManager()) ||
+ "".equals(((GsisshHostType)host.getType()).getJobManager())){
+ return true;
+ }else{
+ // default is pbs so we return true
+ return false;
+ }
+ }
+ public static boolean isSlurm(HostDescription host){
+ if("slurm".equals(((GsisshHostType)host.getType()).getJobManager())){
+ return true;
+ }else{
+ // default is pbs so we return true
+ return false;
+ }
+ }
+ public static boolean isSGE(HostDescription host){
+ if("sge".equals(((GsisshHostType)host.getType()).getJobManager())){
+ return true;
+ }else{
+ // default is pbs so we return true
+ return false;
+ }
+ }
+ public static String getChannelID(MonitorID monitorID) {
+ return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName();
+ }
+
+ public static String getRoutingKey(MonitorID monitorID) {
+ return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress();
+ }
+
+ public static String getChannelID(String userName,String hostAddress) {
+ return userName + "-" + hostAddress;
+ }
+
+ public static String getRoutingKey(String userName,String hostAddress) {
+ return "*." + userName + "." + hostAddress;
+ }
+
+ public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ UserMonitorData next = iterator.next();
+ if (next.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+ for (HostMonitorData host : monitorIDs) {
+ if (host.getHost().equals(monitorID.getHost())) {
+ // ok we found right place to add this monitorID
+ host.addMonitorIDForHost(monitorID);
+ return;
+ }
+ }
+ // there is a userMonitor object for this user name but no Hosts for this host
+ // so we have to create new Hosts
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
+ next.addHostMonitorData(hostMonitorData);
+ return;
+ }
+ }
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
+
+ UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+ userMonitorData.addHostMonitorData(hostMonitorData);
+ try {
+ queue.put(userMonitorData);
+ } catch (InterruptedException e) {
+ throw new AiravataMonitorException(e);
+ }
+ }
+ public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
+ Iterator<MonitorID> iterator = queue.iterator();
+ while(iterator.hasNext()){
+ MonitorID next = iterator.next();
+ if(monitorID.getUserName().equals(next.getUserName()) && CommonUtils.isEqual(monitorID.getHost(), next.getHost())){
+ return false;
+ }
+ }
+ return true;
+ }
+ public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while(iterator.hasNext()){
+ UserMonitorData next = iterator.next();
+ if(next.getUserName().equals(monitorID.getUserName())){
+ // then this is the right place to update
+ List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+ for(HostMonitorData iHostMonitorID:hostMonitorData){
+ if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+ List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+ for(MonitorID iMonitorID:monitorIDs){
+ if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
+ // OK we found the object, we cannot do list.remove(object) states of two objects
+ // could be different, thats why we check the jobID
+ monitorIDs.remove(iMonitorID);
+ if(monitorIDs.size()==0) {
+ hostMonitorData.remove(iHostMonitorID);
+ if (hostMonitorData.size() == 0) {
+ // no useful data so we have to remove the element from the queue
+ queue.remove(next);
+ }
+ }
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+ throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
+ monitorID.getUserName() + " and jobID " + monitorID.getJobID());
+
+ }
+
+ public static boolean isEqual(HostDescription host1,HostDescription host2) {
+ if ((host1.getType() instanceof GsisshHostType) && (host2.getType() instanceof GsisshHostType)) {
+ GsisshHostType hostType1 = (GsisshHostType)host1.getType();
+ GsisshHostType hostType2 = (GsisshHostType)host2.getType();
+ if(hostType1.getHostAddress().equals(hostType2.getHostAddress())
+ && hostType1.getJobManager().equals(hostType2.getJobManager())
+ && (hostType1.getPort() == hostType2.getPort())
+ && hostType1.getMonitorMode().equals(hostType2.getMonitorMode())){
+ return true;
+ }
+ } else {
+ logger.error("This method is only impmlemented to handle Gsissh host types");
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
new file mode 100644
index 0000000..c29490a
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMReader;
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+ static {
+ // parsing of RSA key fails without this
+ java.security.Security.addProvider(new BouncyCastleProvider());
+ }
+
+
+
+ public static KeyStore keyStoreFromPEM(String proxyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException {
+ return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+ }
+
+ public static KeyStore keyStoreFromPEM(String certFile,
+ String keyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
+ //System.out.println(cert.toString());
+
+ // this works for proxy files, too, since it skips over the certificate
+ BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+ String line = null;
+ StringBuilder builder = new StringBuilder();
+ boolean inKey = false;
+ while((line=reader.readLine()) != null) {
+ if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+ inKey = true;
+ }
+ if (inKey) {
+ builder.append(line);
+ builder.append(System.getProperty("line.separator"));
+ }
+ if (line.contains("-----END RSA PRIVATE KEY-----")) {
+ inKey = false;
+ }
+ }
+ String privKeyPEM = builder.toString();
+ //System.out.println(privKeyPEM);
+
+ // using BouncyCastle
+ PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM));
+ Object object = pemParser.readObject();
+
+ PrivateKey privKey = null;
+ if(object instanceof KeyPair){
+ privKey = ((KeyPair)object).getPrivate();
+ }
+ // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency
+ /*
+ // Base64 decode the data
+ byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+ // PKCS8 decode the encoded RSA private key
+ java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ PrivateKey privKey = kf.generatePrivate(keySpec);
+ //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+ */
+ //System.out.println(privKey.toString());
+
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(null,null);
+
+ KeyStore.PrivateKeyEntry entry =
+ new KeyStore.PrivateKeyEntry(privKey,
+ new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+ KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+ keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
+
+ return keyStore;
+ }
+
+
+ public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException, ApplicationSettingsException {
+ return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location"));
+ }
+
+ public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(null,null);
+
+ File dir = new File(certDir);
+ for(File file : dir.listFiles()) {
+ if (!file.isFile()) {
+ continue;
+ }
+ if (!file.getName().endsWith(".0")) {
+ continue;
+ }
+
+ try {
+ //System.out.println("reading file "+file.getName());
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
+ //System.out.println(cert.toString());
+
+ KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
+
+ ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
+ } catch (KeyStoreException e) {
+ } catch (CertificateParsingException e) {
+ continue;
+ }
+
+ }
+
+ return ks;
+ }
+}
+