You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/04/08 18:19:10 UTC
[13/15] airavata git commit: Add new gfac email monitor module to
gfac monitor module and Intorduced gfac-hpc-monitor module to keep all
hoc-monitor code
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+ private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+ private String userName;
+
+ private List<HostMonitorData> hostMonitorData;
+
+
+ public UserMonitorData(String userName) {
+ this.userName = userName;
+ hostMonitorData = new ArrayList<HostMonitorData>();
+ }
+
+ public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+ this.hostMonitorData = hostMonitorDataList;
+ this.userName = userName;
+ }
+
+ public List<HostMonitorData> getHostMonitorData() {
+ return hostMonitorData;
+ }
+
+ public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+ this.hostMonitorData = hostMonitorData;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /*
+ This method will add element to the MonitorID list, user should not
+ duplicate it, we do not check it because its going to be used by airavata
+ so we have to use carefully and this method will add a host if its a new host
+ */
+ public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+ this.hostMonitorData.add(hostMonitorData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+ private String experimentId;
+
+ public ExperimentCancelRequest(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+ private String experimentId;
+ private String nodeId;
+ private String taskId;
+
+ public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+ this.experimentId = experimentId;
+ this.setNodeId(nodeId);
+ this.taskId = taskId;
+ }
+ public String getExperimentId() {
+ return experimentId;
+ }
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..c754d3c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect nto consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+ protected MonitorPublisher publisher;
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+ /**
+ * This method is to implement how to parse the incoming message
+ * and implement a logic to finalize the status of the job,
+ * we have to makesure the correct message is given to the messageparser
+ * parse method, it will not do any filtering
+ * @param message content of the message
+ * @return
+ */
+ JobState parseMessage(String message)throws AiravataMonitorException;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..614d606
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor {
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
new file mode 100644
index 0000000..efdf89c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PullMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This will allow users to program Pull monitors separately
+ */
+public abstract class PullMonitor extends AiravataAbstractMonitor {
+
+ private int pollingFrequence;
+ /**
+ * This method will can invoke when PullMonitor needs to start
+ * and it has to invoke in the frequency specified below,
+ * @return if the start process is successful return true else false
+ */
+ public abstract boolean startPulling() throws AiravataMonitorException;
+
+ /**
+ * This is the method to stop the polling process
+ * @return if the stopping process is successful return true else false
+ */
+ public abstract boolean stopPulling()throws AiravataMonitorException;
+
+ /**
+ * this method can be used to set the polling frequencey or otherwise
+ * can implement a polling mechanism, and implement how to do
+ * @param frequence
+ */
+ public void setPollingFrequence(int frequence){
+ this.pollingFrequence = frequence;
+ }
+
+ /**
+ * this method can be used to get the polling frequencey or otherwise
+ * can implement a polling mechanism, and implement how to do
+ * @return
+ */
+ public int getPollingFrequence(){
+ return this.pollingFrequence;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
new file mode 100644
index 0000000..1b6a228
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PushMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This interface will allow users to program Push monitors separately
+ */
+public abstract class PushMonitor extends AiravataAbstractMonitor {
+ /**
+ * This method can be invoked to register a listener with the
+ * remote monitoring system, ideally inside this method users will be
+ * writing some client listener code for the remote monitoring system,
+ * this will be a simple wrapper around any client for the remote Monitor.
+ * @param monitorID
+ * @return
+ */
+ public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+
+ /**
+ * This method can be invoked to unregister a listener with the
+ * remote monitoring system, ideally inside this method users will be
+ * writing some client listener code for the remote monitoring system,
+ * this will be a simple wrapper around any client for the remote Monitor.
+ * @param monitorID
+ * @return
+ */
+ public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
+
+ /**
+ * This can be used to stop the registration thread
+ * @return
+ * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+ */
+ public abstract boolean stopRegister()throws AiravataMonitorException;
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
new file mode 100644
index 0000000..3acef66
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.exception;
+
+public class AiravataMonitorException extends Exception {
+ private static final long serialVersionUID = -2849422320139467602L;
+
+ public AiravataMonitorException(Throwable e) {
+ super(e);
+ }
+
+ public AiravataMonitorException(String message) {
+ super(message, null);
+ }
+
+ public AiravataMonitorException(String message, Throwable e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
new file mode 100644
index 0000000..24b300e
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.File;
+import java.util.Properties;
+
+/**
+ * this handler is responsible for monitoring jobs in a pull mode
+ * and currently this support multiple pull monitoring in grid resource and uses
+ * commands like qstat,squeue and this supports sun grid enging monitoring too
+ * which is a slight variation of qstat monitoring.
+ */
+public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
+ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GridPullMonitorHandler.class);
+
+ private HPCPullMonitor hpcPullMonitor;
+
+ private AuthenticationInfo authenticationInfo;
+
+ public void initProperties(Properties properties) throws GFacHandlerException {
+ String myProxyUser = null;
+ try {
+ myProxyUser = ServerSettings.getSetting("myproxy.username");
+ String myProxyPass = ServerSettings.getSetting("myproxy.password");
+ String certPath = ServerSettings.getSetting("trusted.cert.location");
+ String myProxyServer = ServerSettings.getSetting("myproxy.server");
+ setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+ 7512, 17280000, certPath));
+ if(BetterGfacImpl.getMonitorPublisher() != null){
+ hpcPullMonitor = new HPCPullMonitor(BetterGfacImpl.getMonitorPublisher(),getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store
+ }else {
+ throw new GFacHandlerException("Error initializing Monitor Handler, because Monitor Publisher is null !!!");
+ }
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error while reading server properties", e);
+ throw new GFacHandlerException("Error while reading server properties", e);
+ }
+ }
+
+ public void run() {
+ hpcPullMonitor.run();
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ super.invoke(jobExecutionContext);
+ hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
+ MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
+ try {
+ ZooKeeper zk = jobExecutionContext.getZk();
+ try {
+ String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
+ String path = experimentEntry + File.separator + "operation";
+ Stat exists = zk.exists(path, this);
+ if (exists != null) {
+ zk.getData(path, this, exists); // watching the operations node
+ }
+ } catch (KeeperException e) {
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext);
+ CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
+ } catch (AiravataMonitorException e) {
+ logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}",
+ monitorID.getJobID(), monitorID.getExperimentID());
+ }
+ }
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public HPCPullMonitor getHpcPullMonitor() {
+ return hpcPullMonitor;
+ }
+
+ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ this.authenticationInfo = authenticationInfo;
+ }
+
+ public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) {
+ this.hpcPullMonitor = hpcPullMonitor;
+ }
+
+
+ public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
+ if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
+ // node data is changed, this means node is cancelled.
+ logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
+
+ String[] split = watchedEvent.getPath().split("/");
+ for(String element:split) {
+ if (element.contains("+")) {
+ logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element);
+ hpcPullMonitor.getCancelJobList().add(element);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
new file mode 100644
index 0000000..0eb4526
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * this handler is responsible monitoring jobs in push mode
+ * and currently this support multiple push monitoring in grid resource
+ */
+public class GridPushMonitorHandler extends ThreadedHandler {
+ private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class);
+
+ private AMQPMonitor amqpMonitor;
+
+ private AuthenticationInfo authenticationInfo;
+
+ @Override
+ public void initProperties(Properties properties) throws GFacHandlerException {
+ String myProxyUser=null;
+ try{
+ myProxyUser = ServerSettings.getSetting("myproxy.username");
+ String myProxyPass = ServerSettings.getSetting("myproxy.password");
+ String certPath = ServerSettings.getSetting("trusted.cert.location");
+ String myProxyServer = ServerSettings.getSetting("myproxy.server");
+ setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+ 7512, 17280000, certPath));
+
+ String hostList=(String)properties.get("hosts");
+ String proxyFilePath = ServerSettings.getSetting("proxy.file.path");
+ String connectionName=ServerSettings.getSetting("connection.name");
+ LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
+ LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
+ List<String> hosts= Arrays.asList(hostList.split(","));
+ amqpMonitor=new AMQPMonitor(BetterGfacImpl.getMonitorPublisher(),pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
+ }catch (ApplicationSettingsException e){
+ logger.error(e.getMessage(), e);
+ throw new GFacHandlerException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void run() {
+ amqpMonitor.run();
+ }
+
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
+ super.invoke(jobExecutionContext);
+ MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
+ amqpMonitor.getRunningQueue().add(monitorID);
+ }
+
+ public AMQPMonitor getAmqpMonitor() {
+ return amqpMonitor;
+ }
+
+ public void setAmqpMonitor(AMQPMonitor amqpMonitor) {
+ this.amqpMonitor = amqpMonitor;
+ }
+
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ this.authenticationInfo = authenticationInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
new file mode 100644
index 0000000..ab934ca
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -0,0 +1,469 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.logger.AiravataLogger;
+import org.apache.airavata.common.logger.AiravataLoggerFactory;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This monitor is based on qstat command which can be run
+ * in grid resources and retrieve the job status.
+ */
+public class HPCPullMonitor extends PullMonitor {
+
+ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
+ public static final int FAILED_COUNT = 5;
+
+ // I think this should use DelayedBlocking Queue to do the monitoring*/
+ private BlockingQueue<UserMonitorData> queue;
+
+ private boolean startPulling = false;
+
+ private Map<String, ResourceConnection> connections;
+
+ private MonitorPublisher publisher;
+
+ private LinkedBlockingQueue<String> cancelJobList;
+
+ private List<String> completedJobsFromPush;
+
+ private GFac gfac;
+
+ private AuthenticationInfo authenticationInfo;
+
+ private ArrayList<MonitorID> removeList;
+
+ public HPCPullMonitor() {
+ connections = new HashMap<String, ResourceConnection>();
+ queue = new LinkedBlockingDeque<UserMonitorData>();
+ publisher = new MonitorPublisher(new EventBus());
+ cancelJobList = new LinkedBlockingQueue<String>();
+ completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+ removeList = new ArrayList<MonitorID>();
+ }
+
+ public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
+ connections = new HashMap<String, ResourceConnection>();
+ queue = new LinkedBlockingDeque<UserMonitorData>();
+ publisher = monitorPublisher;
+ authenticationInfo = authInfo;
+ cancelJobList = new LinkedBlockingQueue<String>();
+ this.completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+ removeList = new ArrayList<MonitorID>();
+ }
+
+ public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+ this.queue = queue;
+ this.publisher = publisher;
+ connections = new HashMap<String, ResourceConnection>();
+ cancelJobList = new LinkedBlockingQueue<String>();
+ this.completedJobsFromPush = new ArrayList<String>();
+ (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
+ removeList = new ArrayList<MonitorID>();
+ }
+
+
+ public void run() {
+ /* implement a logic to pick each monitorID object from the queue and do the
+ monitoring
+ */
+ this.startPulling = true;
+ while (this.startPulling && !ServerSettings.isStopAllThreads()) {
+ try {
+ // After finishing one iteration of the full queue this thread sleeps 1 second
+ synchronized (this.queue) {
+ if (this.queue.size() > 0) {
+ startPulling();
+ }
+ }
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ // we catch all the exceptions here because no matter what happens we do not stop running this
+ // thread, but ideally we should report proper error messages, but this is handled in startPulling
+ // method, incase something happen in Thread.sleep we handle it with this catch block.
+ logger.error(e.getMessage(),e);
+ }
+ }
+ // thread is going to return so we close all the connections
+ Iterator<String> iterator = connections.keySet().iterator();
+ while (iterator.hasNext()) {
+ String next = iterator.next();
+ ResourceConnection resourceConnection = connections.get(next);
+ try {
+ resourceConnection.getCluster().disconnect();
+ } catch (SSHApiException e) {
+ logger.error("Erro while connecting to the cluster", e);
+ }
+ }
+ }
+
+ /**
+ * This method will can invoke when PullMonitor needs to start
+ * and it has to invoke in the frequency specified below,
+ *
+ * @return if the start process is successful return true else false
+ */
+ public boolean startPulling() throws AiravataMonitorException {
+ // take the top element in the queue and pull the data and put that element
+ // at the tail of the queue
+ //todo this polling will not work with multiple usernames but with single user
+ // and multiple hosts, currently monitoring will work
+ UserMonitorData take = null;
+ JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+ MonitorID currentMonitorID = null;
+ try {
+ take = this.queue.take();
+ List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+ for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) {
+ HostMonitorData iHostMonitorData = hostIterator.next();
+ if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+ String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
+ ResourceConnection connection = null;
+ if (connections.containsKey(hostName)) {
+ if (!connections.get(hostName).isConnected()) {
+ connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
+ connections.put(hostName, connection);
+ } else {
+ logger.debug("We already have this connection so not going to create one");
+ connection = connections.get(hostName);
+ }
+ } else {
+ connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
+ connections.put(hostName, connection);
+ }
+
+ // before we get the statuses, we check the cancel job list and remove them permanently
+ List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+ Iterator<String> iterator1 = cancelJobList.iterator();
+ ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
+ while (monitorIDListIterator.hasNext()) {
+ MonitorID iMonitorID = monitorIDListIterator.next();
+ while (iterator1.hasNext()) {
+ String cancelMId = iterator1.next();
+ if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
+ iMonitorID.setStatus(JobState.CANCELED);
+// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+ removeList.add(iMonitorID);
+ logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " +
+ "completed job queue, experiment {}, task {} , job {}",
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID());
+ logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .",
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+ sendNotification(iMonitorID);
+ logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
+ Thread.sleep(10000);
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ break;
+ }
+ }
+ iterator1 = cancelJobList.iterator();
+ }
+
+ cleanup(take);
+
+ synchronized (completedJobsFromPush) {
+ for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) {
+ String completeId = iterator.next();
+ for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) {
+ MonitorID iMonitorID = monitorIDListIterator.next();
+ if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+ logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
+ iMonitorID.setStatus(JobState.COMPLETE);
+// CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak
+ removeList.add(iMonitorID);
+ logger.debugId(completeId, "Push notification updated job {} status to {}. " +
+ "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+ logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+
+ sendNotification(iMonitorID);
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ break;
+ }
+ }
+ }
+ }
+
+ cleanup(take);
+
+ // we have to get this again because we removed the already completed jobs with amqp messages
+ monitorID = iHostMonitorData.getMonitorIDs();
+ Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
+ for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
+ MonitorID iMonitorID = iterator.next();
+ currentMonitorID = iMonitorID;
+ if (!JobState.CANCELED.equals(iMonitorID.getStatus()) &&
+ !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+ iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic
+ } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) {
+ logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " +
+ "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+ removeList.add(iMonitorID);
+ logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ }
+ iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ sendNotification(iMonitorID);
+ // if the job is completed we do not have to put the job to the queue again
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ }
+
+ cleanup(take);
+
+
+ for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
+ MonitorID iMonitorID = iterator.next();
+ if (iMonitorID.getFailedCount() > FAILED_COUNT) {
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ String outputDir = iMonitorID.getJobExecutionContext().getOutputDir();
+ List<String> stdOut = null;
+ try {
+ stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory
+ } catch (SSHApiException e) {
+ if (e.getMessage().contains("No such file or directory")) {
+ // this is because while we run output handler something failed and during exception
+ // we store all the jobs in the monitor queue again
+ logger.error("We know this job is already attempted to run out-handlers");
+// CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
+ }
+ }
+ if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this
+ iMonitorID.setStatus(JobState.COMPLETE);
+ logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " +
+ " Experiment {} , task {}", iMonitorID.getFailedCount(),
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+ logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
+ iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
+ sendNotification(iMonitorID);
+// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
+ removeList.add(iMonitorID);
+ GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
+ } else {
+ iMonitorID.setFailedCount(0);
+ }
+ } else {
+ // Evey
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ // if the job is complete we remove it from the Map, if any of these maps
+ // get empty this userMonitorData will get delete from the queue
+ }
+ }
+
+ cleanup(take);
+
+
+ } else {
+ logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.
+ getComputeResourceDescription().getHostName());
+ }
+ }
+ // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+ // now the userMonitorData goes back to the tail of the queue
+ // during individual monitorID removal we remove the HostMonitorData object if it become empty
+ // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object
+ // we should remove it from the queue so here we do not put it back.
+ for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) {
+ HostMonitorData iHostMonitorID = iterator1.next();
+ if (iHostMonitorID.getMonitorIDs().size() == 0) {
+ iterator1.remove();
+ logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName());
+ }
+ }
+ if(take.getHostMonitorData().size()!=0) {
+ queue.put(take);
+ }
+ } catch (InterruptedException e) {
+ if (!this.queue.contains(take)) {
+ try {
+ this.queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+ throw new AiravataMonitorException(e);
+ } catch (SSHApiException e) {
+ logger.error(e.getMessage());
+ if (e.getMessage().contains("Unknown Job Id Error")) {
+ // in this case job is finished or may be the given job ID is wrong
+ jobStatus.setState(JobState.UNKNOWN);
+ JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
+ if (currentMonitorID != null){
+ jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
+ jobIdentifier.setTaskId(currentMonitorID.getTaskID());
+ jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
+ jobIdentifier.setJobId(currentMonitorID.getJobID());
+ jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
+ }
+ jobStatus.setJobIdentity(jobIdentifier);
+ publisher.publish(jobStatus);
+ } else if (e.getMessage().contains("illegally formed job identifier")) {
+ logger.error("Wrong job ID is given so dropping the job from monitoring system");
+ } else if (!this.queue.contains(take)) {
+ try {
+ queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ } catch (Exception e) {
+ try {
+ queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ }
+ return true;
+ }
+
+ private void sendNotification(MonitorID iMonitorID) {
+ JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
+ JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
+ iMonitorID.getTaskID(),
+ iMonitorID.getWorkflowNodeID(),
+ iMonitorID.getExperimentID(),
+ iMonitorID.getJobExecutionContext().getGatewayID());
+ jobStatus.setJobIdentity(jobIdentity);
+ jobStatus.setState(iMonitorID.getStatus());
+ // we have this JobStatus class to handle amqp monitoring
+ logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
+ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
+ jobStatus.getJobIdentity().getTaskId());
+
+ publisher.publish(jobStatus);
+ }
+
+ /**
+ * This is the method to stop the polling process
+ *
+ * @return if the stopping process is successful return true else false
+ */
+ public boolean stopPulling() {
+ this.startPulling = false;
+ return true;
+ }
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public BlockingQueue<UserMonitorData> getQueue() {
+ return queue;
+ }
+
+ public void setQueue(BlockingQueue<UserMonitorData> queue) {
+ this.queue = queue;
+ }
+
+ public boolean authenticate() {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, ResourceConnection> getConnections() {
+ return connections;
+ }
+
+ public boolean isStartPulling() {
+ return startPulling;
+ }
+
+ public void setConnections(Map<String, ResourceConnection> connections) {
+ this.connections = connections;
+ }
+
+ public void setStartPulling(boolean startPulling) {
+ this.startPulling = startPulling;
+ }
+
+ public GFac getGfac() {
+ return gfac;
+ }
+
+ public void setGfac(GFac gfac) {
+ this.gfac = gfac;
+ }
+
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ this.authenticationInfo = authenticationInfo;
+ }
+
+ public LinkedBlockingQueue<String> getCancelJobList() {
+ return cancelJobList;
+ }
+
+ public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
+ this.cancelJobList = cancelJobList;
+ }
+
+
+ private void cleanup(UserMonitorData userMonitorData){
+ for(MonitorID iMonitorId:removeList){
+ try {
+ CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId);
+ } catch (AiravataMonitorException e) {
+ logger.error(e.getMessage(), e);
+ logger.error("Error deleting the monitor data: " + iMonitorId.getJobID());
+ }
+ }
+ removeList.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
new file mode 100644
index 0000000..f718535
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.SecurityContext;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class ResourceConnection {
+ private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
+
+ private PBSCluster cluster;
+
+ private AuthenticationInfo authenticationInfo;
+
+
+ public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
+ MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
+ try {
+ SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+ if(securityContext != null) {
+ if (securityContext instanceof GSISecurityContext) {
+ GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
+ cluster = (PBSCluster) gsiSecurityContext.getPbsCluster();
+ } else if (securityContext instanceof SSHSecurityContext) {
+ SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
+ securityContext;
+ cluster = (PBSCluster) sshSecurityContext.getPbsCluster();
+ }
+ }
+ // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
+ // we are using our own credentials and not using one users account to do everything.
+ authenticationInfo = authInfo;
+ } catch (GFacException e) {
+ log.error("Error reading data from job ExecutionContext");
+ }
+ }
+
+ public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException {
+ MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
+ try {
+ GSISecurityContext securityContext = (GSISecurityContext)
+ monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
+ cluster = (PBSCluster) securityContext.getPbsCluster();
+
+ // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
+ // we are using our own credentials and not using one users account to do everything.
+ cluster = new PBSCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration());
+ } catch (GFacException e) {
+ log.error("Error reading data from job ExecutionContext");
+ }
+ }
+
+ public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
+ String jobID = monitorID.getJobID();
+ //todo so currently we execute the qstat for each job but we can use user based monitoring
+ //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
+ return getStatusFromString(cluster.getJobStatus(jobID).toString());
+ }
+
+ public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException {
+ Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>();
+ Map<String, JobState> treeMap1 = new TreeMap<String, JobState>();
+ // creating a sorted map with all the jobIds and with the predefined
+ // status as UNKNOWN
+ for (MonitorID monitorID : monitorIDs) {
+ treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U);
+ }
+ String userName = cluster.getServerInfo().getUserName();
+ //todo so currently we execute the qstat for each job but we can use user based monitoring
+ //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
+ //
+ cluster.getJobStatuses(userName, treeMap);
+ for (String key : treeMap.keySet()) {
+ treeMap1.put(key, getStatusFromString(treeMap.get(key).toString()));
+ }
+ return treeMap1;
+ }
+
+ private JobState getStatusFromString(String status) {
+ log.info("parsing the job status returned : " + status);
+ if (status != null) {
+ if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
+ return JobState.COMPLETE;
+ } else if ("H".equals(status) || "h".equals(status)) {
+ return JobState.HELD;
+ } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
+ return JobState.QUEUED;
+ } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
+ return JobState.ACTIVE;
+ } else if ("T".equals(status)) {
+ return JobState.HELD;
+ } else if ("W".equals(status) || "PD".equals(status)) {
+ return JobState.QUEUED;
+ } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
+ return JobState.SUSPENDED;
+ } else if ("CA".equals(status)) {
+ return JobState.CANCELED;
+ } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
+ return JobState.FAILED;
+ } else if ("PR".equals(status) || "Er".equals(status)) {
+ return JobState.FAILED;
+ } else if ("U".equals(status) || ("UNKWN".equals(status))) {
+ return JobState.UNKNOWN;
+ }
+ }
+ return JobState.UNKNOWN;
+ }
+
+ public PBSCluster getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(PBSCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ public boolean isConnected(){
+ return this.cluster.getSession().isConnected();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
new file mode 100644
index 0000000..de8cd8c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+/**
+ * This is the implementation for AMQP based finishQueue, this uses
+ * rabbitmq client to recieve AMQP based monitoring data from
+ * mostly excede resources.
+ */
+public class AMQPMonitor extends PushMonitor {
+ private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+
+ /* this will keep all the channels available in the system, we do not create
+ channels for all the jobs submitted, but we create channels for each user for each
+ host.
+ */
+ private Map<String, Channel> availableChannels;
+
+ private MonitorPublisher publisher;
+
+ private MonitorPublisher localPublisher;
+
+ private BlockingQueue<MonitorID> runningQueue;
+
+ private BlockingQueue<MonitorID> finishQueue;
+
+ private String connectionName;
+
+ private String proxyPath;
+
+ private List<String> amqpHosts;
+
+ private boolean startRegister;
+
+ public AMQPMonitor(){
+
+ }
+ public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+ BlockingQueue<MonitorID> finishQueue,
+ String proxyPath,String connectionName,List<String> hosts) {
+ this.publisher = publisher;
+ this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
+ this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
+ this.availableChannels = new HashMap<String, Channel>();
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
+ this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher.registerListener(this);
+ }
+
+ public void initialize(String proxyPath, String connectionName, List<String> hosts) {
+ this.availableChannels = new HashMap<String, Channel>();
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
+ this.localPublisher = new MonitorPublisher(new EventBus());
+ this.localPublisher.registerListener(this);
+ }
+
+ @Override
+ public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
+ // we subscribe to read user-host based subscription
+ ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
+ if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
+ // we get first ip address for the moment
+ String hostAddress = computeResourceDescription.getIpAddresses().get(0);
+ // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+ // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+ String channelID = CommonUtils.getChannelID(monitorID);
+ if (availableChannels.get(channelID) == null) {
+ try {
+ //todo need to fix this rather getting it from a file
+ Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+ Channel channel = null;
+ channel = connection.createChannel();
+ availableChannels.put(channelID, channel);
+ String queueName = channel.queueDeclare().getQueue();
+
+ BasicConsumer consumer = new
+ BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
+ channel.basicConsume(queueName, true, consumer);
+ String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+ // here we queuebind to a particular user in a particular machine
+ channel.queueBind(queueName, "glue2.computing_activity", filterString);
+ logger.info("Using filtering string to monitor: " + filterString);
+ } catch (IOException e) {
+ logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+ }
+ }
+ } else {
+ throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
+ " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
+ "IpAddress with it");
+ }
+ return true;
+ }
+
+ public void run() {
+ // before going to the while true mode we start unregister thread
+ startRegister = true; // this will be unset by someone else
+ while (startRegister || !ServerSettings.isStopAllThreads()) {
+ try {
+ MonitorID take = runningQueue.take();
+ this.registerListener(take);
+ } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ } catch (Exception e){
+ logger.error(e.getMessage(), e);
+ }
+ }
+ Set<String> strings = availableChannels.keySet();
+ for(String key:strings) {
+ Channel channel = availableChannels.get(key);
+ try {
+ channel.close();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Subscribe
+ public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<MonitorID> iterator = finishQueue.iterator();
+ MonitorID next = null;
+ while(iterator.hasNext()){
+ next = iterator.next();
+ if(next.getJobID().endsWith(monitorID.getJobID())){
+ break;
+ }
+ }
+ if(next == null) {
+ logger.error("Job has removed from the queue, old obsolete message recieved");
+ return false;
+ }
+ String channelID = CommonUtils.getChannelID(next);
+ if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
+ finishQueue.remove(next);
+
+ // if this is the last job in the queue at this point with the same username and same host we
+ // close the channel and close the connection and remove it from availableChannels
+ if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
+ logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
+ ", incase new job created we do subscribe again");
+ Channel channel = availableChannels.get(channelID);
+ if (channel == null) {
+ logger.error("Already Unregistered the listener");
+ throw new AiravataMonitorException("Already Unregistered the listener");
+ } else {
+ try {
+ channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
+ channel.close();
+ channel.getConnection().close();
+ availableChannels.remove(channelID);
+ } catch (IOException e) {
+ logger.error("Error unregistering the listener");
+ throw new AiravataMonitorException("Error unregistering the listener");
+ }
+ }
+ }
+ }
+ next.setStatus(monitorID.getStatus());
+ JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(),
+ next.getTaskID(),
+ next.getWorkflowNodeID(),
+ next.getExperimentID(),
+ next.getJobExecutionContext().getGatewayID());
+ publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
+ return true;
+ }
+ @Override
+ public boolean stopRegister() throws AiravataMonitorException {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Channel> getAvailableChannels() {
+ return availableChannels;
+ }
+
+ public void setAvailableChannels(Map<String, Channel> availableChannels) {
+ this.availableChannels = availableChannels;
+ }
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+
+ public BlockingQueue<MonitorID> getRunningQueue() {
+ return runningQueue;
+ }
+
+ public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+ this.runningQueue = runningQueue;
+ }
+
+ public BlockingQueue<MonitorID> getFinishQueue() {
+ return finishQueue;
+ }
+
+ public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+ this.finishQueue = finishQueue;
+ }
+
+ public String getProxyPath() {
+ return proxyPath;
+ }
+
+ public void setProxyPath(String proxyPath) {
+ this.proxyPath = proxyPath;
+ }
+
+ public List<String> getAmqpHosts() {
+ return amqpHosts;
+ }
+
+ public void setAmqpHosts(List<String> amqpHosts) {
+ this.amqpHosts = amqpHosts;
+ }
+
+ public boolean isStartRegister() {
+ return startRegister;
+ }
+
+ public void setStartRegister(boolean startRegister) {
+ this.startRegister = startRegister;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6ffc3ee7/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
new file mode 100644
index 0000000..bd5c625
--- /dev/null
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+
+public class BasicConsumer implements Consumer {
+ private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+ private MessageParser parser;
+
+ private MonitorPublisher publisher;
+
+ public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+ this.parser = parser;
+ this.publisher = publisher;
+ }
+
+ public void handleCancel(String consumerTag) {
+ }
+
+ public void handleCancelOk(String consumerTag) {
+ }
+
+ public void handleConsumeOk(String consumerTag) {
+ }
+
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+
+ logger.debug("job update for: " + envelope.getRoutingKey());
+ String message = new String(body);
+ message = message.replaceAll("(?m)^", " ");
+ // Here we parse the message and get the job status and push it
+ // to the Event bus, this will be picked by
+// AiravataJobStatusUpdator and store in to registry
+
+ logger.debug("************************************************************");
+ logger.debug("AMQP Message recieved \n" + message);
+ logger.debug("************************************************************");
+ try {
+ String jobID = envelope.getRoutingKey().split("\\.")[0];
+ MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null);
+ monitorID.setStatus(parser.parseMessage(message));
+ publisher.publish(monitorID);
+ } catch (AiravataMonitorException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ public void handleRecoverOk(String consumerTag) {
+ }
+
+ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+ }
+
+}