You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/03 20:14:31 UTC
[14/39] airavata git commit: Refactored gfac sub modules,
merged gfac-ssh, gfac-gsissh, gfac-local,
gfac-monitor and gsissh modules and create gface-impl,
removed implementation from gfac-core to gfac-impl
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
deleted file mode 100644
index a003f55..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-/**
- * This is an interface to implement messageparser, it could be
- * pull based or push based still monitor has to parse the content of
- * the message it gets from remote monitoring system and finalize
- * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
- * for pull based monitor.
- */
-public interface MessageParser {
- /**
- * This method is to implement how to parse the incoming message
- * and implement a logic to finalize the status of the job,
- * we have to makesure the correct message is given to the messageparser
- * parse method, it will not do any filtering
- * @param message content of the message
- * @return
- */
- JobState parseMessage(String message)throws AiravataMonitorException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
deleted file mode 100644
index 614d606..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-
-/**
- * This is the primary interface for Monitors,
- * This can be used to implement different methods of monitoring
- */
-public interface Monitor {
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
deleted file mode 100644
index efdf89c..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-
-/**
- * PullMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This will allow users to program Pull monitors separately
- */
-public abstract class PullMonitor extends AiravataAbstractMonitor {
-
- private int pollingFrequence;
- /**
- * This method will can invoke when PullMonitor needs to start
- * and it has to invoke in the frequency specified below,
- * @return if the start process is successful return true else false
- */
- public abstract boolean startPulling() throws AiravataMonitorException;
-
- /**
- * This is the method to stop the polling process
- * @return if the stopping process is successful return true else false
- */
- public abstract boolean stopPulling()throws AiravataMonitorException;
-
- /**
- * this method can be used to set the polling frequencey or otherwise
- * can implement a polling mechanism, and implement how to do
- * @param frequence
- */
- public void setPollingFrequence(int frequence){
- this.pollingFrequence = frequence;
- }
-
- /**
- * this method can be used to get the polling frequencey or otherwise
- * can implement a polling mechanism, and implement how to do
- * @return
- */
- public int getPollingFrequence(){
- return this.pollingFrequence;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
deleted file mode 100644
index 1b6a228..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.core;
-
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-
-/**
- * PushMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This interface will allow users to program Push monitors separately
- */
-public abstract class PushMonitor extends AiravataAbstractMonitor {
- /**
- * This method can be invoked to register a listener with the
- * remote monitoring system, ideally inside this method users will be
- * writing some client listener code for the remote monitoring system,
- * this will be a simple wrapper around any client for the remote Monitor.
- * @param monitorID
- * @return
- */
- public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
-
- /**
- * This method can be invoked to unregister a listener with the
- * remote monitoring system, ideally inside this method users will be
- * writing some client listener code for the remote monitoring system,
- * this will be a simple wrapper around any client for the remote Monitor.
- * @param monitorID
- * @return
- */
- public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
-
- /**
- * This can be used to stop the registration thread
- * @return
- * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
- */
- public abstract boolean stopRegister()throws AiravataMonitorException;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
deleted file mode 100644
index 3acef66..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.exception;
-
-public class AiravataMonitorException extends Exception {
- private static final long serialVersionUID = -2849422320139467602L;
-
- public AiravataMonitorException(Throwable e) {
- super(e);
- }
-
- public AiravataMonitorException(String message) {
- super(message, null);
- }
-
- public AiravataMonitorException(String message, Throwable e) {
- super(message, e);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
deleted file mode 100644
index 10192b7..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.handlers;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.monitor.HPCMonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.File;
-import java.util.Properties;
-
-/**
- * this handler is responsible for monitoring jobs in a pull mode
- * and currently this support multiple pull monitoring in grid resource and uses
- * commands like qstat,squeue and this supports sun grid enging monitoring too
- * which is a slight variation of qstat monitoring.
- */
-public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
- private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GridPullMonitorHandler.class);
-
- private HPCPullMonitor hpcPullMonitor;
-
- private AuthenticationInfo authenticationInfo;
-
- public void initProperties(Properties properties) throws GFacHandlerException {
- String myProxyUser = null;
- try {
- myProxyUser = ServerSettings.getSetting("myproxy.username");
- String myProxyPass = ServerSettings.getSetting("myproxy.password");
- String certPath = ServerSettings.getSetting("trusted.cert.location");
- String myProxyServer = ServerSettings.getSetting("myproxy.server");
- setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
- 7512, 17280000, certPath));
- hpcPullMonitor = new HPCPullMonitor(null,getAuthenticationInfo()); // we use our own credentials for monitoring, not from the store
- } catch (ApplicationSettingsException e) {
- logger.error("Error while reading server properties", e);
- throw new GFacHandlerException("Error while reading server properties", e);
- }
- }
-
- public void run() {
- hpcPullMonitor.run();
- }
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- super.invoke(jobExecutionContext);
- hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
- hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher());
- MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
- try {
- /* ZooKeeper zk = jobExecutionContext.getZk();
- try {
- String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
- String path = experimentEntry + File.separator + "operation";
- Stat exists = zk.exists(path, this);
- if (exists != null) {
- zk.getData(path, this, exists); // watching the operations node
- }
- } catch (KeeperException e) {
- logger.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- }*/
- CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext);
- CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper
- } catch (AiravataMonitorException e) {
- logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}",
- monitorID.getJobID(), monitorID.getExperimentID());
- }
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public HPCPullMonitor getHpcPullMonitor() {
- return hpcPullMonitor;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-
- public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) {
- this.hpcPullMonitor = hpcPullMonitor;
- }
-
-
- public void process(WatchedEvent watchedEvent) {
- logger.info(watchedEvent.getPath());
- if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
- // node data is changed, this means node is cancelled.
- logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
-
- String[] split = watchedEvent.getPath().split("/");
- for(String element:split) {
- if (element.contains("+")) {
- logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element);
- hpcPullMonitor.getCancelJobList().add(element);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
deleted file mode 100644
index 8b445df..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.handlers;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.HPCMonitorID;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * this handler is responsible monitoring jobs in push mode
- * and currently this support multiple push monitoring in grid resource
- */
-public class GridPushMonitorHandler extends ThreadedHandler {
- private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class);
-
- private AMQPMonitor amqpMonitor;
-
- private AuthenticationInfo authenticationInfo;
-
- @Override
- public void initProperties(Properties properties) throws GFacHandlerException {
- String myProxyUser=null;
- try{
- myProxyUser = ServerSettings.getSetting("myproxy.username");
- String myProxyPass = ServerSettings.getSetting("myproxy.password");
- String certPath = ServerSettings.getSetting("trusted.cert.location");
- String myProxyServer = ServerSettings.getSetting("myproxy.server");
- setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
- 7512, 17280000, certPath));
-
- String hostList=(String)properties.get("hosts");
- String proxyFilePath = ServerSettings.getSetting("proxy.file.path");
- String connectionName=ServerSettings.getSetting("connection.name");
- LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
- LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
- List<String> hosts= Arrays.asList(hostList.split(","));
- amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
- }catch (ApplicationSettingsException e){
- logger.error(e.getMessage(), e);
- throw new GFacHandlerException(e.getMessage(), e);
- }
- }
-
- @Override
- public void run() {
- amqpMonitor.run();
- }
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
- super.invoke(jobExecutionContext);
- MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
- amqpMonitor.getRunningQueue().add(monitorID);
- }
-
- @Override
- public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- // TODO: Auto generated method body.
- }
-
- public AMQPMonitor getAmqpMonitor() {
- return amqpMonitor;
- }
-
- public void setAmqpMonitor(AMQPMonitor amqpMonitor) {
- this.amqpMonitor = amqpMonitor;
- }
-
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
deleted file mode 100644
index 3442367..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.gfac.core.cpi.GFac;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
-import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.monitor.UserMonitorData;
-import org.apache.airavata.gfac.monitor.core.PullMonitor;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * This monitor is based on qstat command which can be run
- * in grid resources and retrieve the job status.
- */
-public class HPCPullMonitor extends PullMonitor {
-
- private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
- public static final int FAILED_COUNT = 5;
-
- // I think this should use DelayedBlocking Queue to do the monitoring*/
- private BlockingQueue<UserMonitorData> queue;
-
- private boolean startPulling = false;
-
- private Map<String, ResourceConnection> connections;
-
- private MonitorPublisher publisher;
-
- private LinkedBlockingQueue<String> cancelJobList;
-
- private List<String> completedJobsFromPush;
-
- private GFac gfac;
-
- private AuthenticationInfo authenticationInfo;
-
- private ArrayList<MonitorID> removeList;
-
- public HPCPullMonitor() {
- connections = new HashMap<String, ResourceConnection>();
- queue = new LinkedBlockingDeque<UserMonitorData>();
- publisher = new MonitorPublisher(new EventBus());
- cancelJobList = new LinkedBlockingQueue<String>();
- completedJobsFromPush = new ArrayList<String>();
- (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
- removeList = new ArrayList<MonitorID>();
- }
-
- public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
- connections = new HashMap<String, ResourceConnection>();
- queue = new LinkedBlockingDeque<UserMonitorData>();
- publisher = monitorPublisher;
- authenticationInfo = authInfo;
- cancelJobList = new LinkedBlockingQueue<String>();
- this.completedJobsFromPush = new ArrayList<String>();
- (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
- removeList = new ArrayList<MonitorID>();
- }
-
- public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
- this.queue = queue;
- this.publisher = publisher;
- connections = new HashMap<String, ResourceConnection>();
- cancelJobList = new LinkedBlockingQueue<String>();
- this.completedJobsFromPush = new ArrayList<String>();
- (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
- removeList = new ArrayList<MonitorID>();
- }
-
-
- public void run() {
- /* implement a logic to pick each monitorID object from the queue and do the
- monitoring
- */
- this.startPulling = true;
- while (this.startPulling && !ServerSettings.isStopAllThreads()) {
- try {
- // After finishing one iteration of the full queue this thread sleeps 1 second
- synchronized (this.queue) {
- if (this.queue.size() > 0) {
- startPulling();
- }
- }
- Thread.sleep(10000);
- } catch (Exception e) {
- // we catch all the exceptions here because no matter what happens we do not stop running this
- // thread, but ideally we should report proper error messages, but this is handled in startPulling
- // method, incase something happen in Thread.sleep we handle it with this catch block.
- logger.error(e.getMessage(),e);
- }
- }
- // thread is going to return so we close all the connections
- Iterator<String> iterator = connections.keySet().iterator();
- while (iterator.hasNext()) {
- String next = iterator.next();
- ResourceConnection resourceConnection = connections.get(next);
- try {
- resourceConnection.getCluster().disconnect();
- } catch (SSHApiException e) {
- logger.error("Erro while connecting to the cluster", e);
- }
- }
- }
-
- /**
- * This method will can invoke when PullMonitor needs to start
- * and it has to invoke in the frequency specified below,
- *
- * @return if the start process is successful return true else false
- */
- public boolean startPulling() throws AiravataMonitorException {
- // take the top element in the queue and pull the data and put that element
- // at the tail of the queue
- //todo this polling will not work with multiple usernames but with single user
- // and multiple hosts, currently monitoring will work
- UserMonitorData take = null;
- JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
- MonitorID currentMonitorID = null;
- try {
- take = this.queue.take();
- List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
- for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) {
- HostMonitorData iHostMonitorData = hostIterator.next();
- if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
- String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
- ResourceConnection connection = null;
- if (connections.containsKey(hostName)) {
- if (!connections.get(hostName).isConnected()) {
- connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
- connections.put(hostName, connection);
- } else {
- logger.debug("We already have this connection so not going to create one");
- connection = connections.get(hostName);
- }
- } else {
- connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
- connections.put(hostName, connection);
- }
-
- // before we get the statuses, we check the cancel job list and remove them permanently
- List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
- Iterator<String> iterator1 = cancelJobList.iterator();
- ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
- while (monitorIDListIterator.hasNext()) {
- MonitorID iMonitorID = monitorIDListIterator.next();
- while (iterator1.hasNext()) {
- String cancelMId = iterator1.next();
- if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
- iMonitorID.setStatus(JobState.CANCELED);
-// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
- removeList.add(iMonitorID);
- logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " +
- "completed job queue, experiment {}, task {} , job {}",
- iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID());
- logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .",
- iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
- sendNotification(iMonitorID);
- logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
- Thread.sleep(10000);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
- break;
- }
- }
- iterator1 = cancelJobList.iterator();
- }
-
- cleanup(take);
-
- synchronized (completedJobsFromPush) {
- for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) {
- String completeId = iterator.next();
- for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) {
- MonitorID iMonitorID = monitorIDListIterator.next();
- if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
- logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
- iMonitorID.setStatus(JobState.COMPLETE);
-// CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak
- removeList.add(iMonitorID);
- logger.debugId(completeId, "Push notification updated job {} status to {}. " +
- "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
- iMonitorID.getExperimentID(), iMonitorID.getTaskID());
- logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
- iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-
- sendNotification(iMonitorID);
- logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
- Thread.sleep(10000);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
- break;
- }
- }
- }
- }
-
- cleanup(take);
-
- // we have to get this again because we removed the already completed jobs with amqp messages
- monitorID = iHostMonitorData.getMonitorIDs();
- Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
- for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
- MonitorID iMonitorID = iterator.next();
- currentMonitorID = iMonitorID;
- if (!JobState.CANCELED.equals(iMonitorID.getStatus()) &&
- !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
- iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic
- } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) {
- logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " +
- "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
- removeList.add(iMonitorID);
- logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
- iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
- }
- iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- sendNotification(iMonitorID);
- // if the job is completed we do not have to put the job to the queue again
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- }
-
- cleanup(take);
-
-
- for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
- MonitorID iMonitorID = iterator.next();
- if (iMonitorID.getFailedCount() > FAILED_COUNT) {
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- String outputDir = iMonitorID.getJobExecutionContext().getOutputDir();
- List<String> stdOut = null;
- try {
- stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory
- } catch (SSHApiException e) {
- if (e.getMessage().contains("No such file or directory")) {
- // this is because while we run output handler something failed and during exception
- // we store all the jobs in the monitor queue again
- logger.error("We know this job is already attempted to run out-handlers");
-// CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
- }
- }
- if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this
- iMonitorID.setStatus(JobState.COMPLETE);
- logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " +
- " Experiment {} , task {}", iMonitorID.getFailedCount(),
- iMonitorID.getExperimentID(), iMonitorID.getTaskID());
- logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
- iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
- sendNotification(iMonitorID);
-// CommonUtils.removeMonitorFromQueue(take, iMonitorID);
- removeList.add(iMonitorID);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
- } else {
- iMonitorID.setFailedCount(0);
- }
- } else {
- // Evey
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- // if the job is complete we remove it from the Map, if any of these maps
- // get empty this userMonitorData will get delete from the queue
- }
- }
-
- cleanup(take);
-
-
- } else {
- logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.
- getComputeResourceDescription().getHostName());
- }
- }
- // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
- // now the userMonitorData goes back to the tail of the queue
- // during individual monitorID removal we remove the HostMonitorData object if it become empty
- // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object
- // we should remove it from the queue so here we do not put it back.
- for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) {
- HostMonitorData iHostMonitorID = iterator1.next();
- if (iHostMonitorID.getMonitorIDs().size() == 0) {
- iterator1.remove();
- logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName());
- }
- }
- if(take.getHostMonitorData().size()!=0) {
- queue.put(take);
- }
- } catch (InterruptedException e) {
- if (!this.queue.contains(take)) {
- try {
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
- throw new AiravataMonitorException(e);
- } catch (SSHApiException e) {
- logger.error(e.getMessage());
- if (e.getMessage().contains("Unknown Job Id Error")) {
- // in this case job is finished or may be the given job ID is wrong
- jobStatus.setState(JobState.UNKNOWN);
- JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
- if (currentMonitorID != null){
- jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
- jobIdentifier.setTaskId(currentMonitorID.getTaskID());
- jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
- jobIdentifier.setJobId(currentMonitorID.getJobID());
- jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
- }
- jobStatus.setJobIdentity(jobIdentifier);
- publisher.publish(jobStatus);
- } else if (e.getMessage().contains("illegally formed job identifier")) {
- logger.error("Wrong job ID is given so dropping the job from monitoring system");
- } else if (!this.queue.contains(take)) {
- try {
- queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- } catch (Exception e) {
- try {
- queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- }
- return true;
- }
-
- private void sendNotification(MonitorID iMonitorID) {
- JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
- JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
- iMonitorID.getTaskID(),
- iMonitorID.getWorkflowNodeID(),
- iMonitorID.getExperimentID(),
- iMonitorID.getJobExecutionContext().getGatewayID());
- jobStatus.setJobIdentity(jobIdentity);
- jobStatus.setState(iMonitorID.getStatus());
- // we have this JobStatus class to handle amqp monitoring
- logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
- "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
- jobStatus.getJobIdentity().getTaskId());
-
- publisher.publish(jobStatus);
- }
-
- /**
- * This is the method to stop the polling process
- *
- * @return if the stopping process is successful return true else false
- */
- public boolean stopPulling() {
- this.startPulling = false;
- return true;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<UserMonitorData> getQueue() {
- return queue;
- }
-
- public void setQueue(BlockingQueue<UserMonitorData> queue) {
- this.queue = queue;
- }
-
- public boolean authenticate() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Map<String, ResourceConnection> getConnections() {
- return connections;
- }
-
- public boolean isStartPulling() {
- return startPulling;
- }
-
- public void setConnections(Map<String, ResourceConnection> connections) {
- this.connections = connections;
- }
-
- public void setStartPulling(boolean startPulling) {
- this.startPulling = startPulling;
- }
-
- public GFac getGfac() {
- return gfac;
- }
-
- public void setGfac(GFac gfac) {
- this.gfac = gfac;
- }
-
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-
- public LinkedBlockingQueue<String> getCancelJobList() {
- return cancelJobList;
- }
-
- public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
- this.cancelJobList = cancelJobList;
- }
-
-
- private void cleanup(UserMonitorData userMonitorData){
- for(MonitorID iMonitorId:removeList){
- try {
- CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId);
- } catch (AiravataMonitorException e) {
- logger.error(e.getMessage(), e);
- logger.error("Error deleting the monitor data: " + iMonitorId.getJobID());
- }
- }
- removeList.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
deleted file mode 100644
index f718535..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.SecurityContext;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public class ResourceConnection {
- private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
-
- private PBSCluster cluster;
-
- private AuthenticationInfo authenticationInfo;
-
-
- public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
- MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
- try {
- SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
- if(securityContext != null) {
- if (securityContext instanceof GSISecurityContext) {
- GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
- cluster = (PBSCluster) gsiSecurityContext.getPbsCluster();
- } else if (securityContext instanceof SSHSecurityContext) {
- SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
- securityContext;
- cluster = (PBSCluster) sshSecurityContext.getPbsCluster();
- }
- }
- // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
- // we are using our own credentials and not using one users account to do everything.
- authenticationInfo = authInfo;
- } catch (GFacException e) {
- log.error("Error reading data from job ExecutionContext");
- }
- }
-
- public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException {
- MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
- try {
- GSISecurityContext securityContext = (GSISecurityContext)
- monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
- cluster = (PBSCluster) securityContext.getPbsCluster();
-
- // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
- // we are using our own credentials and not using one users account to do everything.
- cluster = new PBSCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration());
- } catch (GFacException e) {
- log.error("Error reading data from job ExecutionContext");
- }
- }
-
- public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
- String jobID = monitorID.getJobID();
- //todo so currently we execute the qstat for each job but we can use user based monitoring
- //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
- return getStatusFromString(cluster.getJobStatus(jobID).toString());
- }
-
- public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException {
- Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>();
- Map<String, JobState> treeMap1 = new TreeMap<String, JobState>();
- // creating a sorted map with all the jobIds and with the predefined
- // status as UNKNOWN
- for (MonitorID monitorID : monitorIDs) {
- treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U);
- }
- String userName = cluster.getServerInfo().getUserName();
- //todo so currently we execute the qstat for each job but we can use user based monitoring
- //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
- //
- cluster.getJobStatuses(userName, treeMap);
- for (String key : treeMap.keySet()) {
- treeMap1.put(key, getStatusFromString(treeMap.get(key).toString()));
- }
- return treeMap1;
- }
-
- private JobState getStatusFromString(String status) {
- log.info("parsing the job status returned : " + status);
- if (status != null) {
- if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
- return JobState.COMPLETE;
- } else if ("H".equals(status) || "h".equals(status)) {
- return JobState.HELD;
- } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
- return JobState.QUEUED;
- } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
- return JobState.ACTIVE;
- } else if ("T".equals(status)) {
- return JobState.HELD;
- } else if ("W".equals(status) || "PD".equals(status)) {
- return JobState.QUEUED;
- } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
- return JobState.SUSPENDED;
- } else if ("CA".equals(status)) {
- return JobState.CANCELED;
- } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
- return JobState.FAILED;
- } else if ("PR".equals(status) || "Er".equals(status)) {
- return JobState.FAILED;
- } else if ("U".equals(status) || ("UNKWN".equals(status))) {
- return JobState.UNKNOWN;
- }
- }
- return JobState.UNKNOWN;
- }
-
- public PBSCluster getCluster() {
- return cluster;
- }
-
- public void setCluster(PBSCluster cluster) {
- this.cluster = cluster;
- }
-
- public boolean isConnected(){
- return this.cluster.getSession().isConnected();
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
deleted file mode 100644
index de8cd8c..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.PushMonitor;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-
-/**
- * This is the implementation for AMQP based finishQueue, this uses
- * rabbitmq client to recieve AMQP based monitoring data from
- * mostly excede resources.
- */
-public class AMQPMonitor extends PushMonitor {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-
- /* this will keep all the channels available in the system, we do not create
- channels for all the jobs submitted, but we create channels for each user for each
- host.
- */
- private Map<String, Channel> availableChannels;
-
- private MonitorPublisher publisher;
-
- private MonitorPublisher localPublisher;
-
- private BlockingQueue<MonitorID> runningQueue;
-
- private BlockingQueue<MonitorID> finishQueue;
-
- private String connectionName;
-
- private String proxyPath;
-
- private List<String> amqpHosts;
-
- private boolean startRegister;
-
- public AMQPMonitor(){
-
- }
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
- BlockingQueue<MonitorID> finishQueue,
- String proxyPath,String connectionName,List<String> hosts) {
- this.publisher = publisher;
- this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
- this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
- this.localPublisher.registerListener(this);
- }
-
- public void initialize(String proxyPath, String connectionName, List<String> hosts) {
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
- this.localPublisher.registerListener(this);
- }
-
- @Override
- public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
- // we subscribe to read user-host based subscription
- ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
- if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
- // we get first ip address for the moment
- String hostAddress = computeResourceDescription.getIpAddresses().get(0);
- // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
- // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
- String channelID = CommonUtils.getChannelID(monitorID);
- if (availableChannels.get(channelID) == null) {
- try {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
- Channel channel = null;
- channel = connection.createChannel();
- availableChannels.put(channelID, channel);
- String queueName = channel.queueDeclare().getQueue();
-
- BasicConsumer consumer = new
- BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
- channel.basicConsume(queueName, true, consumer);
- String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
- // here we queuebind to a particular user in a particular machine
- channel.queueBind(queueName, "glue2.computing_activity", filterString);
- logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
- }
- }
- } else {
- throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
- " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
- "IpAddress with it");
- }
- return true;
- }
-
- public void run() {
- // before going to the while true mode we start unregister thread
- startRegister = true; // this will be unset by someone else
- while (startRegister || !ServerSettings.isStopAllThreads()) {
- try {
- MonitorID take = runningQueue.take();
- this.registerListener(take);
- } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
- logger.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- } catch (Exception e){
- logger.error(e.getMessage(), e);
- }
- }
- Set<String> strings = availableChannels.keySet();
- for(String key:strings) {
- Channel channel = availableChannels.get(key);
- try {
- channel.close();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
-
- @Subscribe
- public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
- Iterator<MonitorID> iterator = finishQueue.iterator();
- MonitorID next = null;
- while(iterator.hasNext()){
- next = iterator.next();
- if(next.getJobID().endsWith(monitorID.getJobID())){
- break;
- }
- }
- if(next == null) {
- logger.error("Job has removed from the queue, old obsolete message recieved");
- return false;
- }
- String channelID = CommonUtils.getChannelID(next);
- if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
- finishQueue.remove(next);
-
- // if this is the last job in the queue at this point with the same username and same host we
- // close the channel and close the connection and remove it from availableChannels
- if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
- logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
- ", incase new job created we do subscribe again");
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
- channel.close();
- channel.getConnection().close();
- availableChannels.remove(channelID);
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
- }
- }
- }
- next.setStatus(monitorID.getStatus());
- JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(),
- next.getTaskID(),
- next.getWorkflowNodeID(),
- next.getExperimentID(),
- next.getJobExecutionContext().getGatewayID());
- publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
- return true;
- }
- @Override
- public boolean stopRegister() throws AiravataMonitorException {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Map<String, Channel> getAvailableChannels() {
- return availableChannels;
- }
-
- public void setAvailableChannels(Map<String, Channel> availableChannels) {
- this.availableChannels = availableChannels;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<MonitorID> getRunningQueue() {
- return runningQueue;
- }
-
- public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
- this.runningQueue = runningQueue;
- }
-
- public BlockingQueue<MonitorID> getFinishQueue() {
- return finishQueue;
- }
-
- public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
- this.finishQueue = finishQueue;
- }
-
- public String getProxyPath() {
- return proxyPath;
- }
-
- public void setProxyPath(String proxyPath) {
- this.proxyPath = proxyPath;
- }
-
- public List<String> getAmqpHosts() {
- return amqpHosts;
- }
-
- public void setAmqpHosts(List<String> amqpHosts) {
- this.amqpHosts = amqpHosts;
- }
-
- public boolean isStartRegister() {
- return startRegister;
- }
-
- public void setStartRegister(boolean startRegister) {
- this.startRegister = startRegister;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
deleted file mode 100644
index bd5c625..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.MessageParser;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-
-public class BasicConsumer implements Consumer {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
- private MessageParser parser;
-
- private MonitorPublisher publisher;
-
- public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
- this.parser = parser;
- this.publisher = publisher;
- }
-
- public void handleCancel(String consumerTag) {
- }
-
- public void handleCancelOk(String consumerTag) {
- }
-
- public void handleConsumeOk(String consumerTag) {
- }
-
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
-
- logger.debug("job update for: " + envelope.getRoutingKey());
- String message = new String(body);
- message = message.replaceAll("(?m)^", " ");
- // Here we parse the message and get the job status and push it
- // to the Event bus, this will be picked by
-// AiravataJobStatusUpdator and store in to registry
-
- logger.debug("************************************************************");
- logger.debug("AMQP Message recieved \n" + message);
- logger.debug("************************************************************");
- try {
- String jobID = envelope.getRoutingKey().split("\\.")[0];
- MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null);
- monitorID.setStatus(parser.parseMessage(message));
- publisher.publish(monitorID);
- } catch (AiravataMonitorException e) {
- logger.error(e.getMessage(), e);
- }
- }
-
- public void handleRecoverOk(String consumerTag) {
- }
-
- public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
deleted file mode 100644
index 72c77d5..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.airavata.ComputingActivity;
-import org.apache.airavata.gfac.monitor.core.MessageParser;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-public class JSONMessageParser implements MessageParser {
- private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
-
- public JobState parseMessage(String message)throws AiravataMonitorException {
- /*todo write a json message parser here*/
- logger.debug(message);
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
- logger.info(computingActivity.getIDFromEndpoint());
- List<String> stateList = computingActivity.getState();
- JobState jobState = null;
- for (String aState : stateList) {
- jobState = getStatusFromString(aState);
- }
- // we get the last value of the state array
- return jobState;
- } catch (IOException e) {
- throw new AiravataMonitorException(e);
- }
- }
-
-private JobState getStatusFromString(String status) {
- logger.info("parsing the job status returned : " + status);
- if(status != null){
- if("ipf:finished".equals(status)){
- return JobState.COMPLETE;
- }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
- return JobState.QUEUED;
- }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
- return JobState.ACTIVE;
- }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
- return JobState.HELD;
- } else if ("ipf:suspending".equals(status)) {
- return JobState.SUSPENDED;
- }else if ("ipf:failed".equals(status)) {
- return JobState.FAILED;
- }else if ("ipf:unknown".equals(status)){
- return JobState.UNKNOWN;
- }
- }
- return JobState.UNKNOWN;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
deleted file mode 100644
index c4275f1..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class SimpleJobFinishConsumer {
- private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class);
-
- private List<String> completedJobsFromPush;
-
- public SimpleJobFinishConsumer(List<String> completedJobsFromPush) {
- this.completedJobsFromPush = completedJobsFromPush;
- }
-
- public void listen() {
- try {
- String queueName = ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950");
- String uri = "amqp://localhost";
-
- ConnectionFactory connFactory = new ConnectionFactory();
- connFactory.setUri(uri);
- Connection conn = connFactory.newConnection();
- logger.info("--------Created the connection to Rabbitmq server successfully-------");
-
- final Channel ch = conn.createChannel();
-
- logger.info("--------Created the channel with Rabbitmq server successfully-------");
-
- ch.queueDeclare(queueName, false, false, false, null);
-
- logger.info("--------Declare the queue " + queueName + " in Rabbitmq server successfully-------");
-
- final QueueingConsumer consumer = new QueueingConsumer(ch);
- ch.basicConsume(queueName, consumer);
- (new Thread() {
- public void run() {
- try {
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- logger.info("---------------- Job Finish message received:" + message + " --------------");
- synchronized (completedJobsFromPush) {
- completedJobsFromPush.add(message);
- }
- ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- } catch (Exception ex) {
- logger.error("--------Cannot connect to a RabbitMQ Server--------" , ex);
- }
- }
-
- }).start();
- } catch (Exception ex) {
- logger.error("Cannot connect to a RabbitMQ Server: " , ex);
- logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
deleted file mode 100644
index a701326..0000000
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class UnRegisterWorker{
- private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
- private Map<String, Channel> availableChannels;
-
- public UnRegisterWorker(Map<String, Channel> channels) {
- this.availableChannels = channels;
- }
-
- @Subscribe
- private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException {
- String channelID = CommonUtils.getChannelID(monitorID);
- if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
- channel.close();
- channel.getConnection().close();
- availableChannels.remove(channelID);
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
- }
- }
- return true;
- }
-}
-