You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/18 05:42:08 UTC
[6/7] git commit: moving the job-manager to tools
moving the job-manager to tools
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c2d006e6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c2d006e6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c2d006e6
Branch: refs/heads/master
Commit: c2d006e65884eb0e7b64300fd2d09935d8bf8c95
Parents: 444f2df
Author: lahiru <la...@apache.org>
Authored: Tue Mar 18 00:41:12 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Mar 18 00:41:12 2014 -0400
----------------------------------------------------------------------
modules/airavata-job-monitor/pom.xml | 177 -------------
.../job/monitor/AiravataJobStatusUpdator.java | 127 ----------
.../apache/airavata/job/monitor/MonitorID.java | 211 ----------------
.../airavata/job/monitor/MonitorManager.java | 237 ------------------
.../monitor/core/AiravataAbstractMonitor.java | 46 ----
.../job/monitor/core/MessageParser.java | 45 ----
.../airavata/job/monitor/core/Monitor.java | 30 ---
.../airavata/job/monitor/core/PullMonitor.java | 64 -----
.../airavata/job/monitor/core/PushMonitor.java | 60 -----
.../job/monitor/event/MonitorPublisher.java | 44 ----
.../exception/AiravataMonitorException.java | 37 ---
.../monitor/impl/pull/qstat/QstatMonitor.java | 235 -----------------
.../impl/pull/qstat/ResourceConnection.java | 106 --------
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 249 -------------------
.../monitor/impl/push/amqp/BasicConsumer.java | 83 -------
.../impl/push/amqp/JSONMessageParser.java | 60 -----
.../impl/push/amqp/UnRegisterThread.java | 76 ------
.../airavata/job/monitor/state/JobStatus.java | 58 -----
.../job/monitor/state/JobStatusInfo.java | 48 ----
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 ---
.../monitor/state/impl/GridJobStatusInfo.java | 40 ---
.../job/monitor/util/AMQPConnectionUtil.java | 78 ------
.../airavata/job/monitor/util/CommonUtils.java | 52 ----
.../airavata/job/monitor/util/X509Helper.java | 160 ------------
.../src/main/resources/PBSTemplate.xslt | 77 ------
.../src/main/resources/gsissh.properties | 26 --
.../src/main/resources/schema/AccessPolicy.json | 13 -
.../src/main/resources/schema/Activity.json | 31 ---
.../src/main/resources/schema/AdminDomain.json | 51 ----
.../schema/ApplicationEnvironment.json | 86 -------
.../resources/schema/ApplicationHandle.json | 21 --
.../src/main/resources/schema/Benchmark.json | 21 --
.../resources/schema/ComputingActivity.json | 165 ------------
.../resources/schema/ComputingEndpoint.json | 44 ----
.../main/resources/schema/ComputingManager.json | 117 ---------
.../main/resources/schema/ComputingService.json | 32 ---
.../main/resources/schema/ComputingShare.json | 182 --------------
.../src/main/resources/schema/Contact.json | 32 ---
.../src/main/resources/schema/DataStore.json | 30 ---
.../src/main/resources/schema/Domain.json | 30 ---
.../src/main/resources/schema/Endpoint.json | 147 -----------
.../src/main/resources/schema/Entity.json | 35 ---
.../resources/schema/ExecutionEnvironment.json | 115 ---------
.../src/main/resources/schema/Glue2.json | 246 ------------------
.../src/main/resources/schema/Location.json | 47 ----
.../src/main/resources/schema/Manager.json | 28 ---
.../main/resources/schema/MappingPolicy.json | 13 -
.../src/main/resources/schema/Policy.json | 27 --
.../src/main/resources/schema/Resource.json | 27 --
.../src/main/resources/schema/Service.json | 75 ------
.../src/main/resources/schema/Share.json | 45 ----
.../resources/schema/StorageAccessProtocol.json | 32 ---
.../main/resources/schema/StorageEndpoint.json | 8 -
.../main/resources/schema/StorageManager.json | 8 -
.../main/resources/schema/StorageService.json | 22 --
.../schema/StorageServiceCapacity.json | 33 ---
.../src/main/resources/schema/StorageShare.json | 65 -----
.../resources/schema/StorageShareCapacity.json | 33 ---
.../resources/schema/ToComputingService.json | 32 ---
.../main/resources/schema/ToStorageService.json | 25 --
.../src/main/resources/schema/UserDomain.json | 58 -----
.../airavata/job/monitor/AMQPMonitorTest.java | 138 ----------
.../airavata/job/monitor/QstatMonitorTest.java | 138 ----------
.../src/test/resources/gsissh.properties | 26 --
.../src/test/resources/monitor.properties | 3 -
tools/job-monitor/pom.xml | 177 +++++++++++++
.../job/monitor/AiravataJobStatusUpdator.java | 127 ++++++++++
.../apache/airavata/job/monitor/MonitorID.java | 211 ++++++++++++++++
.../airavata/job/monitor/MonitorManager.java | 237 ++++++++++++++++++
.../monitor/core/AiravataAbstractMonitor.java | 46 ++++
.../job/monitor/core/MessageParser.java | 45 ++++
.../airavata/job/monitor/core/Monitor.java | 30 +++
.../airavata/job/monitor/core/PullMonitor.java | 64 +++++
.../airavata/job/monitor/core/PushMonitor.java | 60 +++++
.../job/monitor/event/MonitorPublisher.java | 44 ++++
.../exception/AiravataMonitorException.java | 37 +++
.../monitor/impl/pull/qstat/QstatMonitor.java | 235 +++++++++++++++++
.../impl/pull/qstat/ResourceConnection.java | 106 ++++++++
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 249 +++++++++++++++++++
.../monitor/impl/push/amqp/BasicConsumer.java | 83 +++++++
.../impl/push/amqp/JSONMessageParser.java | 60 +++++
.../impl/push/amqp/UnRegisterThread.java | 76 ++++++
.../airavata/job/monitor/state/JobStatus.java | 58 +++++
.../job/monitor/state/JobStatusInfo.java | 48 ++++
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 +++
.../monitor/state/impl/GridJobStatusInfo.java | 40 +++
.../job/monitor/util/AMQPConnectionUtil.java | 78 ++++++
.../airavata/job/monitor/util/CommonUtils.java | 52 ++++
.../airavata/job/monitor/util/X509Helper.java | 160 ++++++++++++
.../src/main/resources/PBSTemplate.xslt | 77 ++++++
.../src/main/resources/gsissh.properties | 26 ++
.../src/main/resources/schema/AccessPolicy.json | 13 +
.../src/main/resources/schema/Activity.json | 31 +++
.../src/main/resources/schema/AdminDomain.json | 51 ++++
.../schema/ApplicationEnvironment.json | 86 +++++++
.../resources/schema/ApplicationHandle.json | 21 ++
.../src/main/resources/schema/Benchmark.json | 21 ++
.../resources/schema/ComputingActivity.json | 165 ++++++++++++
.../resources/schema/ComputingEndpoint.json | 44 ++++
.../main/resources/schema/ComputingManager.json | 117 +++++++++
.../main/resources/schema/ComputingService.json | 32 +++
.../main/resources/schema/ComputingShare.json | 182 ++++++++++++++
.../src/main/resources/schema/Contact.json | 32 +++
.../src/main/resources/schema/DataStore.json | 30 +++
.../src/main/resources/schema/Domain.json | 30 +++
.../src/main/resources/schema/Endpoint.json | 147 +++++++++++
.../src/main/resources/schema/Entity.json | 35 +++
.../resources/schema/ExecutionEnvironment.json | 115 +++++++++
.../src/main/resources/schema/Glue2.json | 246 ++++++++++++++++++
.../src/main/resources/schema/Location.json | 47 ++++
.../src/main/resources/schema/Manager.json | 28 +++
.../main/resources/schema/MappingPolicy.json | 13 +
.../src/main/resources/schema/Policy.json | 27 ++
.../src/main/resources/schema/Resource.json | 27 ++
.../src/main/resources/schema/Service.json | 75 ++++++
.../src/main/resources/schema/Share.json | 45 ++++
.../resources/schema/StorageAccessProtocol.json | 32 +++
.../main/resources/schema/StorageEndpoint.json | 8 +
.../main/resources/schema/StorageManager.json | 8 +
.../main/resources/schema/StorageService.json | 22 ++
.../schema/StorageServiceCapacity.json | 33 +++
.../src/main/resources/schema/StorageShare.json | 65 +++++
.../resources/schema/StorageShareCapacity.json | 33 +++
.../resources/schema/ToComputingService.json | 32 +++
.../main/resources/schema/ToStorageService.json | 25 ++
.../src/main/resources/schema/UserDomain.json | 58 +++++
.../airavata/job/monitor/AMQPMonitorTest.java | 138 ++++++++++
.../airavata/job/monitor/QstatMonitorTest.java | 138 ++++++++++
.../src/test/resources/gsissh.properties | 26 ++
.../src/test/resources/monitor.properties | 3 +
130 files changed, 4746 insertions(+), 4746 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/pom.xml b/modules/airavata-job-monitor/pom.xml
deleted file mode 100644
index 185b068..0000000
--- a/modules/airavata-job-monitor/pom.xml
+++ /dev/null
@@ -1,177 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
- the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
- obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
- in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
- the License. -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata</artifactId>
- <version>0.12-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>airavata-job-monitor</artifactId>
- <name>Airavata Job Monitor</name>
- <description>This component handle the Airavata Job monitoring funcationality</description>
- <url>http://airavata.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.jglobus</groupId>
- <artifactId>gss</artifactId>
- <version>${jglobus.version}</version>
- </dependency>
- <dependency>
- <groupId>org.jglobus</groupId>
- <artifactId>myproxy</artifactId>
- <version>${jglobus.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk16</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!--dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15on</artifactId>
- <version>1.48</version>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15on</artifactId>
- <version>1.48</version>
- </dependency-->
- <!-- Logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-registry-cpi</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-jpa-registry</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk16</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.2.3</version>
- </dependency>
- <!-- SSH -->
- <dependency>
- <groupId>net.schmizz</groupId>
- <artifactId>sshj</artifactId>
- <version>0.8.0</version>
- </dependency>
-
-
- <!-- Test -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>6.1.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
-
-
- <!-- Guava -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>12.0</version>
- </dependency>
-
- <!-- gsi-ssh api dependencies -->
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>gsissh</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.jcraft</groupId>
- <artifactId>jsch</artifactId>
- <version>0.1.50</version>
- </dependency>
- <dependency>
- <groupId>org.ogce</groupId>
- <artifactId>bcgss</artifactId>
- <version>146</version>
- </dependency>
- <dependency>
- <groupId>org.apache.xmlbeans</groupId>
- <artifactId>xmlbeans</artifactId>
- <version>${xmlbeans.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.0.0</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- <forkMode>always</forkMode>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-maven-plugin</artifactId>
- <version>0.4.0</version>
- <configuration>
- <sourceDirectory>${basedir}/src/main/resources/schema</sourceDirectory>
- <targetPackage>org.apache.airavata</targetPackage>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
deleted file mode 100644
index 601cc27..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.CompositeIdentifier;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-import java.util.concurrent.BlockingQueue;
-
-public class AiravataJobStatusUpdator{
- private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
-
- private Registry airavataRegistry;
-
- private BlockingQueue<MonitorID> jobsToMonitor;
-
-
- public AiravataJobStatusUpdator(Registry airavataRegistry, BlockingQueue<MonitorID> jobsToMonitor) {
- this.airavataRegistry = airavataRegistry;
- this.jobsToMonitor = jobsToMonitor;
- }
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- public BlockingQueue<MonitorID> getJobsToMonitor() {
- return jobsToMonitor;
- }
-
- public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
- this.jobsToMonitor = jobsToMonitor;
- }
-
- @Subscribe
- public void updateRegistry(JobStatus jobStatus) {
- /* Here we need to parse the jobStatus message and update
- the registry accordingly, for now we are just printing to standard Out
- */
- JobState state = jobStatus.getState();
- if (state != null) {
- try {
- String taskID = jobStatus.getMonitorID().getTaskID();
- String jobID = jobStatus.getMonitorID().getJobID();
- updateJobStatus(taskID, jobID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- switch (state) {
- case COMPLETE:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case UNKNOWN:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- //todo implement this logic
- break;
- case QUEUED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED");
- break;
- case SUBMITTED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED");
- break;
- case ACTIVE:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE");
- break;
- case CANCELED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case FAILED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case HELD:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD");
- break;
- case SUSPENDED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- }
- }
- }
- public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
- JobDetails details = new JobDetails();
- org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
- status.setJobState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setJobStatus(status);
- details.setJobID(jobID);
- CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
- airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, ids);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
deleted file mode 100644
index f65241a..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.omg.PortableInterceptor.ACTIVE;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Map;
-import java.util.Properties;
-
-/*
-This is the object which contains the data to identify a particular
-Job to start the monitoring
-*/
-public class MonitorID {
- private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
-
- private String userName;
-
- private String jobID;
-
- private Timestamp jobStartedTime;
-
- private Timestamp lastMonitored;
-
- private HostDescription host;
-
-
- private AuthenticationInfo authenticationInfo = null;
-
- private Map<String, Object> parameters;
-
- private String experimentID;
-
- private String taskID;
-
- private int failedCount = 0;
-
- private JobState state;
-
- public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
- this.host = host;
- this.jobStartedTime = new Timestamp((new Date()).getTime());
- this.userName = userName;
- this.jobID = jobID;
- this.taskID = taskID;
- this.experimentID = experimentID;
- }
-
- public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName,AuthenticationInfo authenticationInfo) {
- this.host = host;
- this.jobStartedTime = new Timestamp((new Date()).getTime());
- this.authenticationInfo = authenticationInfo;
- this.userName = userName;
- // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
- if(this.authenticationInfo != null){
- if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
- this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
- }
- }
- this.jobID = jobID;
- this.taskID = taskID;
- this.experimentID = experimentID;
- }
- public HostDescription getHost() {
- return host;
- }
-
- public void setHost(HostDescription host) {
- this.host = host;
- }
-
- public Timestamp getLastMonitored() {
- return lastMonitored;
- }
-
- public void setLastMonitored(Timestamp lastMonitored) {
- this.lastMonitored = lastMonitored;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getJobID() {
- return jobID;
- }
-
- public void setJobID(String jobID) {
- this.jobID = jobID;
- }
-
- public Timestamp getJobStartedTime() {
- return jobStartedTime;
- }
-
- public void setJobStartedTime(Timestamp jobStartedTime) {
- this.jobStartedTime = jobStartedTime;
- }
-
- public AuthenticationInfo getAuthenticationInfo() {
- return authenticationInfo;
- }
-
- public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
- this.authenticationInfo = authenticationInfo;
- }
-
- public void addParameter(String key,Object value) {
- this.parameters.put(key, value);
- }
-
- public Object getParameter(String key) {
- return this.parameters.get(key);
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- }
-
- public String getExperimentID() {
- return experimentID;
- }
-
- public void setExperimentID(String experimentID) {
- this.experimentID = experimentID;
- }
-
- public String getTaskID() {
- return taskID;
- }
-
- public void setTaskID(String taskID) {
- this.taskID = taskID;
- }
-
- public int getFailedCount() {
- return failedCount;
- }
-
- public void setFailedCount(int failedCount) {
- this.failedCount = failedCount;
- }
-
- public JobState getStatus() {
- return state;
- }
-
- public void setStatus(JobState status) {
- // this logic is going to be useful for fast finishing jobs
- // because in some machines job state vanishes quicckly when the job is done
- // during that case job state comes as unknown.so we handle it here.
- if (this.state != null && status.equals(JobState.UNKNOWN)) {
- if (getFailedCount() > 2) {
- switch (this.state) {
- case ACTIVE:
- this.state = JobState.COMPLETE;
- break;
- case QUEUED:
- this.state = JobState.COMPLETE;
- break;
- }
- } else {
- try {
- // when state becomes unknown we sleep for a while
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- setFailedCount(getFailedCount() + 1);
- }
- } else {
- // normal scenario
- this.state = status;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
deleted file mode 100644
index 3515a68..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.job.monitor.core.PullMonitor;
-import org.apache.airavata.job.monitor.core.PushMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/*
-this is the manager class for monitoring system of airavata,
-This simply handle the monitoring flow of the system.
-Keeps available jobs to monitor in a queue and once they are done
-remove them from the queue, this will be done by AiravataJobUpdator.
- */
-public class MonitorManager {
- private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
-
- private List<PullMonitor> pullMonitors; //todo though we have a List we only support one at a time
-
- private List<PushMonitor> pushMonitors; //todo we need to support multiple monitors dynamically
-
- private BlockingQueue<MonitorID> pullQueue;
-
- private BlockingQueue<MonitorID> pushQueue;
-
- private BlockingQueue<MonitorID> localJobQueue;
-
- private BlockingQueue<MonitorID> finishQueue;
-
- private MonitorPublisher monitorPublisher;
-
- /**
- * This will initialize the major monitoring system.
- */
- public MonitorManager() {
- pullMonitors = new ArrayList<PullMonitor>();
- pushMonitors = new ArrayList<PushMonitor>();
- pullQueue = new LinkedBlockingQueue<MonitorID>();
- pushQueue = new LinkedBlockingQueue<MonitorID>();
- finishQueue = new LinkedBlockingQueue<MonitorID>();
- monitorPublisher = new MonitorPublisher(new EventBus());
- registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
- }
-
- /**
- * This can be use to add an empty AMQPMonitor object to the monitor system
- * and tihs method will take care of the initialization
- * todo may be we need to move this to some other class
- * @param monitor
- */
- public void addAMQPMonitor(AMQPMonitor monitor) {
- monitor.setPublisher(this.getMonitorPublisher());
- monitor.setFinishQueue(this.getFinishQueue());
- monitor.setRunningQueue(this.getPushQueue());
- addPushMonitor(monitor);
- }
-
- /**
- * This can be used to adda a QstatMonitor and it will take care of
- * the initialization of QstatMonitor
- * //todo may be we need to move this to some other class
- * @param qstatMonitor
- */
- public void addQstatMonitor(QstatMonitor qstatMonitor) {
- qstatMonitor.setPublisher(this.getMonitorPublisher());
- qstatMonitor.setQueue(this.getPullQueue());
- addPullMonitor(qstatMonitor);
-
- }
-
- /**
- * To deal with the statuses users can write their own listener and implement their own logic
- *
- * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
- * have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
- */
- public void registerListener(Object listener) {
- monitorPublisher.registerListener(listener);
- }
-
- /**
- * todo write
- *
- * @param monitor
- */
- public void addPushMonitor(PushMonitor monitor) {
- pushMonitors.add(monitor);
- }
-
- /**
- * todo write
- *
- * @param monitor
- */
- public void addPullMonitor(PullMonitor monitor) {
- pullMonitors.add(monitor);
- }
-
- /**
- * Adding this method will trigger the thread in launchMonitor and notify it
- * This is going to be useful during the startup of the launching process
- *
- * @param monitorID
- * @throws AiravataMonitorException
- */
- public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
- if (monitorID.getHost().getType() instanceof GsisshHostType) {
- GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
- if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
- || Constants.PULL.equals(host.getMonitorMode())) {
- pullQueue.add(monitorID);
- } else if (Constants.PUSH.equals(host.getMonitorMode())) {
- pushQueue.add(monitorID);
- }
- } else {
- logger.error("We only support Gsissh host types currently");
- }
- }
-
- /**
- * This method should be invoked before adding any elements to monitorQueue
- * In this method we assume that we give higher preference to Push
- * Monitorig mechanism if there's any configured, otherwise Pull
- * monitoring will be launched.
- * Ex: If there's a reasource which doesn't support Push, we have
- * to live with Pull MOnitoring.
- *
- * @throws AiravataMonitorException
- */
- public void launchMonitor() throws AiravataMonitorException {
- //no push monitor is configured so we launch pull monitor
- int index = 0;
- for (PullMonitor monitor : pullMonitors) {
- (new Thread(monitor)).start();
- }
-
- for (PushMonitor monitor : pushMonitors) {
- (new Thread(monitor)).start();
- if (monitor instanceof AMQPMonitor) {
- UnRegisterThread unRegisterThread = new
- UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
- unRegisterThread.start();
- }
- }
- }
-
- /* getter setters for the private variables */
-
- public List<PullMonitor> getPullMonitors() {
- return pullMonitors;
- }
-
- public void setPullMonitors(List<PullMonitor> pullMonitors) {
- this.pullMonitors = pullMonitors;
- }
-
- public List<PushMonitor> getPushMonitors() {
- return pushMonitors;
- }
-
- public void setPushMonitors(List<PushMonitor> pushMonitors) {
- this.pushMonitors = pushMonitors;
- }
-
- public BlockingQueue<MonitorID> getPullQueue() {
- return pullQueue;
- }
-
- public void setPullQueue(BlockingQueue<MonitorID> pullQueue) {
- this.pullQueue = pullQueue;
- }
-
- public MonitorPublisher getMonitorPublisher() {
- return monitorPublisher;
- }
-
- public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
- this.monitorPublisher = monitorPublisher;
- }
-
- public BlockingQueue<MonitorID> getFinishQueue() {
- return finishQueue;
- }
-
- public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
- this.finishQueue = finishQueue;
- }
-
- public BlockingQueue<MonitorID> getPushQueue() {
- return pushQueue;
- }
-
- public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
- this.pushQueue = pushQueue;
- }
-
- public BlockingQueue<MonitorID> getLocalJobQueue() {
- return localJobQueue;
- }
-
- public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
- this.localJobQueue = localJobQueue;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/AiravataAbstractMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/AiravataAbstractMonitor.java
deleted file mode 100644
index 2e85b32..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/AiravataAbstractMonitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.core;
-
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the abstract Monitor which needs to be used by
- * any Monitoring implementation which expect to consume
- * to store the status to registry. Because they have to
- * use the MonitorPublisher to publish the monitoring statuses
- * to the Event Bus. All the Monitor statuses publish to the eventbus
- * will be saved to the Registry.
- */
-public abstract class AiravataAbstractMonitor implements Monitor {
- private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
- MonitorPublisher publisher;
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
deleted file mode 100644
index c70e372..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.core;
-
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
-
-/**
- * This is an interface to implement messageparser, it could be
- * pull based or push based still monitor has to parse the content of
- * the message it gets from remote monitoring system and finalize
- * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
- * for pull based monitor.
- */
-public interface MessageParser {
- /**
- * This method is to implement how to parse the incoming message
- * and implement a logic to finalize the status of the job,
- * we have to makesure the correct message is given to the messageparser
- * parse method, it will not do any filtering
- * @param message content of the message
- * @param monitorID monitorID object
- * @return
- */
- JobStatus parseMessage(String message,MonitorID monitorID)throws AiravataMonitorException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
deleted file mode 100644
index 9627bbc..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.core;
-
-
-/**
- * This is the primary interface for Monitors,
- * This can be used to implement different methods of monitoring
- */
-public interface Monitor extends Runnable {
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PullMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PullMonitor.java
deleted file mode 100644
index 4b904b1..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PullMonitor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.core;
-
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-
-/**
- * PullMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This will allow users to program Pull monitors separately
- */
-public abstract class PullMonitor extends AiravataAbstractMonitor{
-
- private int pollingFrequence;
- /**
- * This method will can invoke when PullMonitor needs to start
- * and it has to invoke in the frequency specified below,
- * @return if the start process is successful return true else false
- */
- public abstract boolean startPulling() throws AiravataMonitorException;
-
- /**
- * This is the method to stop the polling process
- * @return if the stopping process is successful return true else false
- */
- public abstract boolean stopPulling()throws AiravataMonitorException;
-
- /**
- * this method can be used to set the polling frequencey or otherwise
- * can implement a polling mechanism, and implement how to do
- * @param frequence
- */
- public void setPollingFrequence(int frequence){
- this.pollingFrequence = frequence;
- }
-
- /**
- * this method can be used to get the polling frequencey or otherwise
- * can implement a polling mechanism, and implement how to do
- * @return
- */
- public int getPollingFrequence(){
- return this.pollingFrequence;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
deleted file mode 100644
index e3ecccd..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.core;
-
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-
-/**
- * PushMonitors can implement this interface
- * Since the pull and push based monitoring required different
- * operations, PullMonitor will be useful.
- * This interface will allow users to program Push monitors separately
- */
-public abstract class PushMonitor extends AiravataAbstractMonitor {
- /**
- * This method can be invoked to register a listener with the
- * remote monitoring system, ideally inside this method users will be
- * writing some client listener code for the remote monitoring system,
- * this will be a simple wrapper around any client for the remote Monitor.
- * @param monitorID
- * @return
- */
- public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
-
- /**
- * This method can be invoked to unregister a listener with the
- * remote monitoring system, ideally inside this method users will be
- * writing some client listener code for the remote monitoring system,
- * this will be a simple wrapper around any client for the remote Monitor.
- * @param monitorID
- * @return
- */
- public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
-
- /**
- * This can be used to stop the registration thread
- * @return
- * @throws AiravataMonitorException
- */
- public abstract boolean stopRegister()throws AiravataMonitorException;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
deleted file mode 100644
index 95b64ab..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.event;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MonitorPublisher {
- private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
- private EventBus eventBus;
-
- public MonitorPublisher(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- public void registerListener(Object listener) {
- eventBus.register(listener);
- }
-
- public void publish(JobStatus jobState) {
- eventBus.post(jobState);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/exception/AiravataMonitorException.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/exception/AiravataMonitorException.java
deleted file mode 100644
index 5a13be2..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/exception/AiravataMonitorException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.exception;
-
-public class AiravataMonitorException extends Exception {
- private static final long serialVersionUID = -2849422320139467602L;
-
- public AiravataMonitorException(Throwable e) {
- super(e);
- }
-
- public AiravataMonitorException(String message) {
- super(message, null);
- }
-
- public AiravataMonitorException(String message, Throwable e) {
- super(message, e);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
deleted file mode 100644
index ee1e6fe..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.pull.qstat;
-
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.core.PullMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor is based on qstat command which can be run
- * in grid resources and retrieve the job status.
- */
-public class QstatMonitor extends PullMonitor {
- private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
-
- // I think this should use DelayedBlocking Queue to do the monitoring*/
- private BlockingQueue<MonitorID> queue;
-
- private boolean startPulling = false;
-
- private Map<String, ResourceConnection> connections;
-
- private MonitorPublisher publisher;
-
- public QstatMonitor(){
- connections = new HashMap<String, ResourceConnection>();
- }
- public QstatMonitor(BlockingQueue<MonitorID> queue, MonitorPublisher publisher) {
- this.queue = queue;
- this.publisher = publisher;
- connections = new HashMap<String, ResourceConnection>();
- }
-
- public void run() {
- /* implement a logic to pick each monitorID object from the queue and do the
- monitoring
- */
- this.startPulling = true;
- while (this.startPulling && !ServerSettings.isStopAllThreads()) {
- try {
- startPulling();
- // After finishing one iteration of the full queue this thread sleeps 1 second
- Thread.sleep(10000);
- } catch (Exception e){
- // we catch all the exceptions here because no matter what happens we do not stop running this
- // thread, but ideally we should report proper error messages, but this is handled in startPulling
- // method, incase something happen in Thread.sleep we handle it with this catch block.
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- // thread is going to return so we close all the connections
- Iterator<String> iterator = connections.keySet().iterator();
- while(iterator.hasNext()){
- String next = iterator.next();
- ResourceConnection resourceConnection = connections.get(next);
- try {
- resourceConnection.getCluster().disconnect();
- } catch (SSHApiException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- /**
- * This method will can invoke when PullMonitor needs to start
- * and it has to invoke in the frequency specified below,
- *
- * @return if the start process is successful return true else false
- */
- public boolean startPulling() throws AiravataMonitorException {
- // take the top element in the queue and pull the data and put that element
- // at the tail of the queue
- MonitorID take = null;
- JobStatus jobStatus = new JobStatus();
- try {
- take = this.queue.take();
- if((take.getHost().getType() instanceof GsisshHostType)){
- long monitorDiff = 0;
- long startedDiff = 0;
- if (take.getLastMonitored() != null) {
- monitorDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getLastMonitored().getTime();
- startedDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getJobStartedTime().getTime();
- //todo implement an algorithm to delay the monitor based no start time, we have to delay monitoring
- //todo for long running jobs
-// System.out.println(monitorDiff + "-" + startedDiff);
- if ((monitorDiff / 1000) < 5) {
- // its too early to monitor this job, so we put it at the tail of the queue
- this.queue.put(take);
- }
- }
- if (take.getLastMonitored() == null || ((monitorDiff / 1000) >= 5)) {
- GsisshHostType gsisshHostType = (GsisshHostType) take.getHost().getType();
- String hostName = gsisshHostType.getHostAddress();
- ResourceConnection connection = null;
- if (connections.containsKey(hostName)) {
- logger.debug("We already have this connection so not going to create one");
- connection = connections.get(hostName);
- } else {
- connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
- connections.put(hostName, connection);
- }
- take.setStatus(connection.getJobStatus(take));
- jobStatus.setMonitorID(take);
- jobStatus.setState(take.getStatus());
- // we have this JobStatus class to handle amqp monitoring
- publisher.publish(jobStatus);
- // if the job is completed we do not have to put the job to the queue again
- if (!jobStatus.getState().equals(JobState.COMPLETE)) {
- take.setLastMonitored(new Timestamp((new Date()).getTime()));
- this.queue.put(take);
- }
- }
- } else {
- logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
- }
- } catch (InterruptedException e) {
- if(!this.queue.contains(take)){
- try {
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- logger.error("Error handling the job with Job ID:" + take.getJobID());
- throw new AiravataMonitorException(e);
- } catch (SSHApiException e) {
- logger.error(e.getMessage());
- if(e.getMessage().contains("Unknown Job Id Error")){
- // in this case job is finished or may be the given job ID is wrong
- jobStatus.setState(JobState.UNKNOWN);
- publisher.publish(jobStatus);
- }else if(e.getMessage().contains("illegally formed job identifier")){
- logger.error("Wrong job ID is given so dropping the job from monitoring system");
- } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown
- if (take.getFailedCount() < 2) {
- try {
- take.setFailedCount(take.getFailedCount() + 1);
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- logger.error(e.getMessage());
- logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
- }
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- } catch (Exception e){
- if (take.getFailedCount() < 3) {
- try {
- take.setFailedCount(take.getFailedCount() + 1);
- this.queue.put(take);
- // if we get a wrong status we wait for a while and request again
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- logger.error(e.getMessage());
- logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- }
-
-
-
- return true;
- }
-
-
- /**
- * This is the method to stop the polling process
- *
- * @return if the stopping process is successful return true else false
- */
- public boolean stopPulling() {
- this.startPulling = false;
- return true;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<MonitorID> getQueue() {
- return queue;
- }
-
- public void setQueue(BlockingQueue<MonitorID> queue) {
- this.queue = queue;
- }
-
- public boolean authenticate() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
deleted file mode 100644
index ade4420..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.pull.qstat;
-
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.*;
-import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ResourceConnection {
- private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
-
- private PBSCluster cluster;
-
- public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
- AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
- String hostAddress = monitorID.getHost().getType().getHostAddress();
- String userName = monitorID.getUserName();
- String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager();
- JobManagerConfiguration jConfig = null;
- if (jobManager == null) {
- log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else {
- if (org.apache.airavata.job.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) {
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) {
- jConfig = CommonUtils.getSLURMJobManager(installedPath);
- }
- //todo support br2 etc
- }
- ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort());
- cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
- }
-
- public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
- String jobID = monitorID.getJobID();
- //todo so currently we execute the qstat for each job but we can use user based monitoring
- //todo or we should concatenate all the commands and execute them in one go and parse the response
- return getStatusFromString(cluster.getJobStatus(jobID).toString());
- }
-
- private JobState getStatusFromString(String status) {
- log.info("parsing the job status returned : " + status);
- if(status != null){
- if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){
- return JobState.COMPLETE;
- }else if("H".equals(status)){
- return JobState.HELD;
- }else if("Q".equals(status)){
- return JobState.QUEUED;
- }else if("R".equals(status) || "CF".equals(status)){
- return JobState.ACTIVE;
- }else if ("T".equals(status)) {
- return JobState.HELD;
- } else if ("W".equals(status) || "PD".equals(status)) {
- return JobState.QUEUED;
- } else if ("S".equals(status)) {
- return JobState.SUSPENDED;
- }else if("CA".equals(status)){
- return JobState.CANCELED;
- }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) {
- return JobState.FAILED;
- }else if ("PR".equals(status)) {
- return JobState.FAILED;
- }else if ("U".equals(status)){
- return JobState.UNKNOWN;
- }
- }
- return JobState.UNKNOWN;
- }
-
- public PBSCluster getCluster() {
- return cluster;
- }
-
- public void setCluster(PBSCluster cluster) {
- this.cluster = cluster;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
deleted file mode 100644
index 06d21a1..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.core.PushMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This is the implementation for AMQP based finishQueue, this uses
- * rabbitmq client to recieve AMQP based monitoring data from
- * mostly excede resources.
- */
-public class AMQPMonitor extends PushMonitor {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-
- /* this will keep all the channels available in the system, we do not create
- channels for all the jobs submitted, but we create channels for each user for each
- host.
- */
- private Map<String, Channel> availableChannels;
-
- private MonitorPublisher publisher;
-
- private BlockingQueue<MonitorID> runningQueue;
-
- private BlockingQueue<MonitorID> finishQueue;
-
- private String connectionName;
-
- private String proxyPath;
-
- private List<String> amqpHosts;
-
- private boolean startRegister;
-
- public AMQPMonitor(){
-
- }
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue,
- String proxyPath,String connectionName,List<String> hosts) {
- this.publisher = publisher;
- this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
- this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- }
-
- public void initialize(String proxyPath, String connectionName, List<String> hosts) {
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- }
-
- @Override
- public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
- // do initial check before creating a channel, otherwise resources will be waste
- // and channel id will be malformed
- // this check is not implemented in MonitorManager because it depends on
- // the Monitoring implementation (what data is required)
- checkMonitorID(monitorID);
- String channelID = CommonUtils.getChannelID(monitorID);
- System.out.println("Going to start monitoring job with ID: " + monitorID.getJobID());
- logger.info("Going to start monitoring job with ID: " + monitorID.getJobID());
- // if we already have a channel we do not create one
- if (availableChannels.get(channelID) == null) {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts,connectionName, proxyPath);
- Channel channel = null;
- try {
- channel = connection.createChannel();
- String queueName = channel.queueDeclare().getQueue();
-
- BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, monitorID);
- channel.basicConsume(queueName, true, consumer);
- String filterString = CommonUtils.getRoutingKey(monitorID);
- // here we queuebind to a particular user in a particular machine
- channel.queueBind(queueName, "glue2.computing_activity", filterString);
- logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + monitorID.getJobID());
- }
- }
- return true;
- }
-
- public void run() {
- // before going to the while true mode we start unregister thread
- startRegister = true; // this will be unset by someone else
- while (startRegister || !ServerSettings.isStopAllThreads()) {
- try {
- MonitorID take = runningQueue.take();
- this.registerListener(take);
- } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- Set<String> strings = availableChannels.keySet();
- for(String key:strings) {
- Channel channel = availableChannels.get(key);
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
-
-
- private void checkMonitorID(MonitorID monitorID) throws AiravataMonitorException {
- if (monitorID.getUserName() == null) {
- String error = "Username has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- } else if (monitorID.getHost() == null) {
- String error = "Host has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- } else if (monitorID.getJobID() == null) {
- String error = "JobID has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- }
- }
-
-
- @Override
- public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
- String channelID = CommonUtils.getChannelID(monitorID);
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
-
-
-
- }
- return true;
- }
-
- @Override
- public boolean stopRegister() throws AiravataMonitorException {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Map<String, Channel> getAvailableChannels() {
- return availableChannels;
- }
-
- public void setAvailableChannels(Map<String, Channel> availableChannels) {
- this.availableChannels = availableChannels;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<MonitorID> getRunningQueue() {
- return runningQueue;
- }
-
- public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
- this.runningQueue = runningQueue;
- }
-
- public BlockingQueue<MonitorID> getFinishQueue() {
- return finishQueue;
- }
-
- public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
- this.finishQueue = finishQueue;
- }
-
- public String getProxyPath() {
- return proxyPath;
- }
-
- public void setProxyPath(String proxyPath) {
- this.proxyPath = proxyPath;
- }
-
- public List<String> getAmqpHosts() {
- return amqpHosts;
- }
-
- public void setAmqpHosts(List<String> amqpHosts) {
- this.amqpHosts = amqpHosts;
- }
-
- public boolean isStartRegister() {
- return startRegister;
- }
-
- public void setStartRegister(boolean startRegister) {
- this.startRegister = startRegister;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
deleted file mode 100644
index ad25b95..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.core.MessageParser;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BasicConsumer implements Consumer {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
- MessageParser parser;
-
- MonitorPublisher publisher;
-
- MonitorID monitorID;
-
- public BasicConsumer(MessageParser parser, MonitorPublisher publisher, MonitorID monitorID) {
- this.parser = parser;
- this.publisher = publisher;
- this.monitorID = monitorID;
- }
-
- public void handleCancel(java.lang.String consumerTag) {
- }
-
- public void handleCancelOk(java.lang.String consumerTag) {
- }
-
- public void handleConsumeOk(java.lang.String consumerTag) {
- }
-
- public void handleDelivery(java.lang.String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
-
- logger.info(" job update for: " + envelope.getRoutingKey());
-
- String message = new String(body);
- message = message.replaceAll("(?m)^", " ");
- // Here we parse the message and get the job status and push it
- // to the Event bus, this will be picked by
- // AiravataJobStatusUpdator and store in to registry
- try {
- publisher.publish(parser.parseMessage(message,monitorID));
- } catch (AiravataMonitorException e) {
- e.printStackTrace();
- }
- }
-
- public void handleRecoverOk(java.lang.String consumerTag) {
- }
-
- public void handleShutdownSignal(java.lang.String consumerTag, ShutdownSignalException sig) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c2d006e6/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
deleted file mode 100644
index dd9d2e4..0000000
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.fasterxml.jackson.databind.DeserializationConfig;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import org.apache.airavata.ComputingActivity;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.core.MessageParser;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-public class JSONMessageParser implements MessageParser {
- private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
-
- public JobStatus parseMessage(String message, MonitorID monitorID)throws AiravataMonitorException{
- /*todo write a json message parser here*/
- logger.info("Mesage parse invoked");
- System.out.println(message);
-// JSONParser parser = new JSONParser();
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.configure(org.codehaus.jackson.map.DeserializationConfig.Feature.UNWRAP_ROOT_VALUE, true);
- try {
- ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
- logger.info(computingActivity.getIDFromEndpoint());
- List<String> stateList = computingActivity.getState();
- for (String aState : stateList) {
- logger.info(aState);
- }
- } catch (IOException e) {
- throw new AiravataMonitorException(e);
- }
- return new JobStatus();
- }
-}