You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 21:42:30 UTC
[3/8] merging monitoring with gfac-core, later this will be separated
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
deleted file mode 100644
index b75b1c8..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.pull.qstat;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.job.monitor.HostMonitorData;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
-import org.apache.airavata.job.monitor.core.PullMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * This monitor is based on qstat command which can be run
- * in grid resources and retrieve the job status.
- */
-public class QstatMonitor extends PullMonitor {
- private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
-
- // I think this should use DelayedBlocking Queue to do the monitoring*/
- private BlockingQueue<UserMonitorData> queue;
-
- private boolean startPulling = false;
-
- private Map<String, ResourceConnection> connections;
-
- private MonitorPublisher publisher;
-
- public QstatMonitor(){
- connections = new HashMap<String, ResourceConnection>();
- }
- public QstatMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
- this.queue = queue;
- this.publisher = publisher;
- connections = new HashMap<String, ResourceConnection>();
- }
-
- public void run() {
- /* implement a logic to pick each monitorID object from the queue and do the
- monitoring
- */
- this.startPulling = true;
- while (this.startPulling && !ServerSettings.isStopAllThreads()) {
- try {
- startPulling();
- // After finishing one iteration of the full queue this thread sleeps 1 second
- Thread.sleep(10000);
- } catch (Exception e){
- // we catch all the exceptions here because no matter what happens we do not stop running this
- // thread, but ideally we should report proper error messages, but this is handled in startPulling
- // method, incase something happen in Thread.sleep we handle it with this catch block.
- e.printStackTrace();
- logger.error(e.getMessage());
- }
- }
- // thread is going to return so we close all the connections
- Iterator<String> iterator = connections.keySet().iterator();
- while(iterator.hasNext()){
- String next = iterator.next();
- ResourceConnection resourceConnection = connections.get(next);
- try {
- resourceConnection.getCluster().disconnect();
- } catch (SSHApiException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- /**
- * This method will can invoke when PullMonitor needs to start
- * and it has to invoke in the frequency specified below,
- *
- * @return if the start process is successful return true else false
- */
- public boolean startPulling() throws AiravataMonitorException {
- // take the top element in the queue and pull the data and put that element
- // at the tail of the queue
- //todo this polling will not work with multiple usernames but with single user
- // and multiple hosts, currently monitoring will work
- UserMonitorData take = null;
- JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
- MonitorID currentMonitorID = null;
- HostDescription currentHostDescription = null;
- try {
- take = this.queue.take();
- List<MonitorID> completedJobs = new ArrayList<MonitorID>();
- List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
- for (HostMonitorData iHostMonitorData : hostMonitorData) {
- if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
- currentHostDescription = iHostMonitorData.getHost();
- GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
- String hostName = gsisshHostType.getHostAddress();
- ResourceConnection connection = null;
- if (connections.containsKey(hostName)) {
- logger.debug("We already have this connection so not going to create one");
- connection = connections.get(hostName);
- } else {
- connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
- connections.put(hostName, connection);
- }
- List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
- Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
- for (MonitorID iMonitorID : monitorID) {
- currentMonitorID = iMonitorID;
- iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
- jobStatus.setMonitorID(iMonitorID);
- jobStatus.setState(iMonitorID.getStatus());
- // we have this JobStatus class to handle amqp monitoring
-
- publisher.publish(jobStatus);
- // if the job is completed we do not have to put the job to the queue again
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-
- // After successful monitoring perform following actions to cleanup the queue, if necessary
- if (jobStatus.getState().equals(JobState.COMPLETE)) {
- completedJobs.add(iMonitorID);
- } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {
- logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- completedJobs.add(iMonitorID);
- } else {
- // Evey
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- // if the job is complete we remove it from the Map, if any of these maps
- // get empty this userMonitorData will get delete from the queue
- }
- }
- } else {
- logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
- }
- }
- // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
- // now the userMonitorData goes back to the tail of the queue
- queue.put(take);
- // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
- // they become empty
- for(MonitorID completedJob:completedJobs){
- CommonUtils.removeMonitorFromQueue(queue,completedJob);
- }
- } catch (InterruptedException e) {
- if (!this.queue.contains(take)) {
- try {
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
- throw new AiravataMonitorException(e);
- } catch (SSHApiException e) {
- logger.error(e.getMessage());
- if (e.getMessage().contains("Unknown Job Id Error")) {
- // in this case job is finished or may be the given job ID is wrong
- jobStatus.setState(JobState.UNKNOWN);
- publisher.publish(jobStatus);
- } else if (e.getMessage().contains("illegally formed job identifier")) {
- logger.error("Wrong job ID is given so dropping the job from monitoring system");
- } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown
- if (currentMonitorID == null) {
- logger.error("Monitoring the jobs failed, for user: " + take.getUserName()
- + " in Host: " + currentHostDescription.getType().getHostAddress());
- } else {
- if (currentMonitorID != null) {
- if (currentMonitorID.getFailedCount() < 2) {
- try {
- currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- logger.error(e.getMessage());
- logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
- }
- }
- }
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- } catch (Exception e) {
- if (currentMonitorID != null) {
- if (currentMonitorID.getFailedCount() < 3) {
- try {
- currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
- this.queue.put(take);
- // if we get a wrong status we wait for a while and request again
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- logger.error(e.getMessage());
- logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
- }
- }
- throw new AiravataMonitorException("Error retrieving the job status", e);
- }
-
-
- return true;
- }
-
-
- /**
- * This is the method to stop the polling process
- *
- * @return if the stopping process is successful return true else false
- */
- public boolean stopPulling() {
- this.startPulling = false;
- return true;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<UserMonitorData> getQueue() {
- return queue;
- }
-
- public void setQueue(BlockingQueue<UserMonitorData> queue) {
- this.queue = queue;
- }
-
- public boolean authenticate() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
deleted file mode 100644
index 8da5054..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.pull.qstat;
-
-import org.apache.airavata.gsi.ssh.api.SSHApiException;
-import org.apache.airavata.gsi.ssh.api.ServerInfo;
-import org.apache.airavata.gsi.ssh.api.authentication.*;
-import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.job.monitor.HostMonitorData;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public class ResourceConnection {
- private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
-
- private PBSCluster cluster;
-
- public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
- AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
- String hostAddress = monitorID.getHost().getType().getHostAddress();
- String userName = monitorID.getUserName();
- String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager();
- JobManagerConfiguration jConfig = null;
- if (jobManager == null) {
- log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else {
- if (org.apache.airavata.job.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) {
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) {
- jConfig = CommonUtils.getSLURMJobManager(installedPath);
- } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSGE(monitorID.getHost())) {
- jConfig = CommonUtils.getSGEJobManager(installedPath);
- }
- //todo support br2 etc
- }
- ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort());
- cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
- }
-
- public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
- AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
- String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
- String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
- JobManagerConfiguration jConfig = null;
- if (jobManager == null) {
- log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else {
- if (org.apache.airavata.job.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
- jConfig = CommonUtils.getPBSJobManager(installedPath);
- } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
- jConfig = CommonUtils.getSLURMJobManager(installedPath);
- }else if(org.apache.airavata.job.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) {
- jConfig = CommonUtils.getSGEJobManager(installedPath);
- }
- //todo support br2 etc
- }
- ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
- cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
- }
- public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
- String jobID = monitorID.getJobID();
- //todo so currently we execute the qstat for each job but we can use user based monitoring
- //todo or we should concatenate all the commands and execute them in one go and parse the response
- return getStatusFromString(cluster.getJobStatus(jobID).toString());
- }
-
- public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
- Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
- Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
- // creating a sorted map with all the jobIds and with the predefined
- // status as UNKNOWN
- for (MonitorID monitorID : monitorIDs) {
- treeMap.put(monitorID.getJobID(), JobStatus.U);
- }
- //todo so currently we execute the qstat for each job but we can use user based monitoring
- //todo or we should concatenate all the commands and execute them in one go and parse the response
- cluster.getJobStatuses(userName,treeMap);
- for(String key:treeMap.keySet()){
- treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
- }
- return treeMap1;
- }
- private JobState getStatusFromString(String status) {
- log.info("parsing the job status returned : " + status);
- if(status != null){
- if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){
- return JobState.COMPLETE;
- }else if("H".equals(status) || "h".equals(status)){
- return JobState.HELD;
- }else if("Q".equals(status) || "qw".equals(status)){
- return JobState.QUEUED;
- }else if("R".equals(status) || "CF".equals(status) || "r".equals(status)){
- return JobState.ACTIVE;
- }else if ("T".equals(status)) {
- return JobState.HELD;
- } else if ("W".equals(status) || "PD".equals(status)) {
- return JobState.QUEUED;
- } else if ("S".equals(status)) {
- return JobState.SUSPENDED;
- }else if("CA".equals(status)){
- return JobState.CANCELED;
- }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) {
- return JobState.FAILED;
- }else if ("PR".equals(status) || "Er".equals(status)) {
- return JobState.FAILED;
- }else if ("U".equals(status)){
- return JobState.UNKNOWN;
- }
- }
- return JobState.UNKNOWN;
- }
-
- public PBSCluster getCluster() {
- return cluster;
- }
-
- public void setCluster(PBSCluster cluster) {
- this.cluster = cluster;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
deleted file mode 100644
index dc6d193..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.job.monitor.JobIdentity;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.core.PushMonitor;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-
-/**
- * This is the implementation for AMQP based finishQueue, this uses
- * rabbitmq client to recieve AMQP based monitoring data from
- * mostly excede resources.
- */
-public class AMQPMonitor extends PushMonitor {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-
- /* this will keep all the channels available in the system, we do not create
- channels for all the jobs submitted, but we create channels for each user for each
- host.
- */
- private Map<String, Channel> availableChannels;
-
- private MonitorPublisher publisher;
-
- private MonitorPublisher localPublisher;
-
- private BlockingQueue<MonitorID> runningQueue;
-
- private BlockingQueue<MonitorID> finishQueue;
-
- private String connectionName;
-
- private String proxyPath;
-
- private List<String> amqpHosts;
-
- private boolean startRegister;
-
- public AMQPMonitor(){
-
- }
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
- BlockingQueue<MonitorID> finishQueue,
- String proxyPath,String connectionName,List<String> hosts) {
- this.publisher = publisher;
- this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
- this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
- this.localPublisher.registerListener(this);
- }
-
- public void initialize(String proxyPath, String connectionName, List<String> hosts) {
- this.availableChannels = new HashMap<String, Channel>();
- this.connectionName = connectionName;
- this.proxyPath = proxyPath;
- this.amqpHosts = hosts;
- this.localPublisher = new MonitorPublisher(new EventBus());
- this.localPublisher.registerListener(this);
- }
-
- @Override
- public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
- // we subscribe to read user-host based subscription
- HostDescription host = monitorID.getHost();
- String hostAddress = host.getType().getHostAddress();
- // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
- // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
- String channelID = CommonUtils.getChannelID(monitorID);
- if(availableChannels.get(channelID) == null){
- try {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
- Channel channel = null;
- channel = connection.createChannel();
- availableChannels.put(channelID, channel);
- String queueName = channel.queueDeclare().getQueue();
-
- BasicConsumer consumer = new
- BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher
- channel.basicConsume(queueName, true, consumer);
- String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
- // here we queuebind to a particular user in a particular machine
- channel.queueBind(queueName, "glue2.computing_activity", filterString);
- logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
- }
- }
- return true;
- }
-
- public void run() {
- // before going to the while true mode we start unregister thread
- startRegister = true; // this will be unset by someone else
- while (startRegister || !ServerSettings.isStopAllThreads()) {
- try {
- MonitorID take = runningQueue.take();
- this.registerListener(take);
- } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- Set<String> strings = availableChannels.keySet();
- for(String key:strings) {
- Channel channel = availableChannels.get(key);
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- @Subscribe
- public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
- Iterator<MonitorID> iterator = finishQueue.iterator();
- MonitorID next = null;
- while(iterator.hasNext()){
- next = iterator.next();
- if(next.getJobID().endsWith(monitorID.getJobID())){
- break;
- }
- }
- if(next == null) {
- logger.error("Job has removed from the queue, old obsolete message recieved");
- return false;
- }
- String channelID = CommonUtils.getChannelID(next);
- if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
- finishQueue.remove(next);
-
- // if this is the last job in the queue at this point with the same username and same host we
- // close the channel and close the connection and remove it from availableChannels
- if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
- logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
- ", incase new job created we do subscribe again");
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
- channel.close();
- channel.getConnection().close();
- availableChannels.remove(channelID);
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
- }
- }
- }
- next.setStatus(monitorID.getStatus());
- publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
- return true;
- }
- @Override
- public boolean stopRegister() throws AiravataMonitorException {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Map<String, Channel> getAvailableChannels() {
- return availableChannels;
- }
-
- public void setAvailableChannels(Map<String, Channel> availableChannels) {
- this.availableChannels = availableChannels;
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public BlockingQueue<MonitorID> getRunningQueue() {
- return runningQueue;
- }
-
- public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
- this.runningQueue = runningQueue;
- }
-
- public BlockingQueue<MonitorID> getFinishQueue() {
- return finishQueue;
- }
-
- public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
- this.finishQueue = finishQueue;
- }
-
- public String getProxyPath() {
- return proxyPath;
- }
-
- public void setProxyPath(String proxyPath) {
- this.proxyPath = proxyPath;
- }
-
- public List<String> getAmqpHosts() {
- return amqpHosts;
- }
-
- public void setAmqpHosts(List<String> amqpHosts) {
- this.amqpHosts = amqpHosts;
- }
-
- public boolean isStartRegister() {
- return startRegister;
- }
-
- public void setStartRegister(boolean startRegister) {
- this.startRegister = startRegister;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
deleted file mode 100644
index 5a2d40d..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.job.monitor.HostMonitorData;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
-import org.apache.airavata.job.monitor.core.MessageParser;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class BasicConsumer implements Consumer {
- private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
- private MessageParser parser;
-
- private MonitorPublisher publisher;
-
- public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
- this.parser = parser;
- this.publisher = publisher;
- }
-
- public void handleCancel(java.lang.String consumerTag) {
- }
-
- public void handleCancelOk(java.lang.String consumerTag) {
- }
-
- public void handleConsumeOk(java.lang.String consumerTag) {
- }
-
- public void handleDelivery(java.lang.String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
-
- logger.debug("job update for: " + envelope.getRoutingKey());
- String message = new String(body);
- message = message.replaceAll("(?m)^", " ");
- // Here we parse the message and get the job status and push it
- // to the Event bus, this will be picked by
-// AiravataJobStatusUpdator and store in to registry
-
- logger.debug("************************************************************");
- logger.debug("AMQP Message recieved \n" + message);
- logger.debug("************************************************************");
- try {
- String jobID = envelope.getRoutingKey().split("\\.")[0];
- MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
- monitorID.setStatus(parser.parseMessage(message));
- publisher.publish(monitorID);
- } catch (AiravataMonitorException e) {
- e.printStackTrace();
- }
- }
-
- public void handleRecoverOk(java.lang.String consumerTag) {
- }
-
- public void handleShutdownSignal(java.lang.String consumerTag, ShutdownSignalException sig) {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
deleted file mode 100644
index cdff685..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.airavata.ComputingActivity;
-import org.apache.airavata.job.monitor.HostMonitorData;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
-import org.apache.airavata.job.monitor.core.MessageParser;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-public class JSONMessageParser implements MessageParser {
- private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
-
- public JobState parseMessage(String message)throws AiravataMonitorException{
- /*todo write a json message parser here*/
- logger.debug(message);
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
- logger.info(computingActivity.getIDFromEndpoint());
- List<String> stateList = computingActivity.getState();
- JobState jobState = null;
- for (String aState : stateList) {
- jobState = getStatusFromString(aState);
- }
- // we get the last value of the state array
- return jobState;
- } catch (IOException e) {
- throw new AiravataMonitorException(e);
- }
- }
-
-private JobState getStatusFromString(String status) {
- logger.info("parsing the job status returned : " + status);
- if(status != null){
- if("ipf:finished".equals(status)){
- return JobState.COMPLETE;
- }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
- return JobState.QUEUED;
- }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
- return JobState.ACTIVE;
- }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
- return JobState.HELD;
- } else if ("ipf:suspending".equals(status)) {
- return JobState.SUSPENDED;
- }else if ("ipf:failed".equals(status)) {
- return JobState.FAILED;
- }else if ("ipf:unknown".equals(status)){
- return JobState.UNKNOWN;
- }
- }
- return JobState.UNKNOWN;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
deleted file mode 100644
index becb4d7..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class UnRegisterWorker{
- private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
- private Map<String, Channel> availableChannels;
-
- public UnRegisterWorker(Map<String, Channel> channels) {
- this.availableChannels = channels;
- }
-
- @Subscribe
- private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
- MonitorID monitorID = jobStatus.getMonitorID();
- String channelID = CommonUtils.getChannelID(monitorID);
- if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
- channel.close();
- channel.getConnection().close();
- availableChannels.remove(channelID);
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
- }
- }
- return true;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
deleted file mode 100644
index bacd8df..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.job.monitor.state;
-
-
-public abstract class AbstractStateChangeRequest implements PublisherMessage{
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
deleted file mode 100644
index 9bee5ca..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state;
-
-import org.apache.airavata.job.monitor.ExperimentIdentity;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{
- private ExperimentState state;
- private ExperimentIdentity identity;
-
- // this constructor can be used in Qstat monitor to handle errors
- public ExperimentStatusChangeRequest() {
- }
-
- public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
- this.state = state;
- setIdentity(experimentIdentity);
- }
-
- public ExperimentState getState() {
- return state;
- }
-
- public void setState(ExperimentState state) {
- this.state = state;
- }
-
- public ExperimentIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(ExperimentIdentity identity) {
- this.identity = identity;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
deleted file mode 100644
index 0db9da6..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state;
-
-import org.apache.airavata.job.monitor.JobIdentity;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.model.workspace.experiment.JobState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class JobStatusChangeRequest extends AbstractStateChangeRequest{
- private JobState state;
- private JobIdentity identity;
-
- private MonitorID monitorID;
-
- // this constructor can be used in Qstat monitor to handle errors
- public JobStatusChangeRequest() {
- }
-
- public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
- setIdentity(jobId);
- setMonitorID(monitorID);
- this.state = state;
- }
-
- public JobState getState() {
- return state;
- }
-
- public void setState(JobState state) {
- this.state = state;
- }
-
- public JobIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(JobIdentity identity) {
- this.identity = identity;
- }
-
- public MonitorID getMonitorID() {
- return monitorID;
- }
-
- public void setMonitorID(MonitorID monitorID) {
- this.monitorID = monitorID;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusInfo.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusInfo.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusInfo.java
deleted file mode 100644
index 10d5ca2..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusInfo.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state;
-
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-
-/**
- * Based on the job status monitoring we can gather
- * different informaation about the job, its not simply
- * the job status, so we need a way to implement
- * different job statusinfo object to keep job status
- */
-public interface JobStatusInfo {
-
- /**
- * This method can be used to get JobStatusInfo data and
- * decide the finalJobState
- *
- * @param jobState
- */
- void setJobStatus(JobStatus jobState);
-
- /**
- * After setting the jobState by processing jobinformation
- * this method can be used to get the JobStatus
- * @return
- */
- JobStatus getJobStatus();
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/PublisherMessage.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/PublisherMessage.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/PublisherMessage.java
deleted file mode 100644
index 055deb1..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/PublisherMessage.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.job.monitor.state;
-
-public interface PublisherMessage {
-// public String getType();
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
deleted file mode 100644
index e8e58db..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state;
-
-import org.apache.airavata.job.monitor.TaskIdentity;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class TaskStatusChangeRequest extends AbstractStateChangeRequest{
- private TaskState state;
- private TaskIdentity identity;
- // this constructor can be used in Qstat monitor to handle errors
- public TaskStatusChangeRequest() {
- }
-
- public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) {
- this.state = state;
- setIdentity(taskIdentity);
- }
-
- public TaskState getState() {
- return state;
- }
-
- public void setState(TaskState state) {
- this.state = state;
- }
-
- public TaskIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(TaskIdentity identity) {
- this.identity = identity;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
deleted file mode 100644
index 7e58e35..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state;
-
-import org.apache.airavata.job.monitor.WorkflowNodeIdentity;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest{
- private WorkflowNodeState state;
- private WorkflowNodeIdentity identity;
-
- // this constructor can be used in Qstat monitor to handle errors
- public WorkflowNodeStatusChangeRequest() {
- }
-
- public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
- this.state = state;
- setIdentity(identity);
- }
-
- public WorkflowNodeState getState() {
- return state;
- }
-
- public void setState(WorkflowNodeState state) {
- this.state = state;
- }
-
- public WorkflowNodeIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(WorkflowNodeIdentity identity) {
- this.identity = identity;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/AmazonJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/AmazonJobStatusInfo.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/AmazonJobStatusInfo.java
deleted file mode 100644
index 385c430..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/AmazonJobStatusInfo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state.impl;
-
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.job.monitor.state.JobStatusInfo;
-
-/**
- * This can be used to store job status information about
- * amazon jobs, this data could be very different from
- * a typical grid job
- */
-public class AmazonJobStatusInfo implements JobStatusInfo {
- public void setJobStatus(JobStatus jobState) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public JobStatus getJobStatus() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/GridJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/GridJobStatusInfo.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/GridJobStatusInfo.java
deleted file mode 100644
index 3c3b421..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/impl/GridJobStatusInfo.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.state.impl;
-
-import org.apache.airavata.gsi.ssh.impl.JobStatus;
-import org.apache.airavata.job.monitor.state.JobStatusInfo;
-
-
-/**
- * This can be used to keep information about a Grid job
- * which we can get from qstat polling or from amqp based
- * monitoring in Grid machines
- */
-public class GridJobStatusInfo implements JobStatusInfo {
- public void setJobStatus(JobStatus jobState) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public JobStatus getJobStatus() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
deleted file mode 100644
index ea27f97..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.util;
-
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultSaslConfig;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.security.KeyStore;
-import java.util.Collections;
-import java.util.List;
-import java.util.Vector;
-
-public class AMQPConnectionUtil {
- public static Connection connect(List<String>hosts,String vhost, String proxyFile) {
- Collections.shuffle(hosts);
- for (String host : hosts) {
- Connection connection = connect(host, vhost, proxyFile);
- if (host != null) {
- System.out.println("connected to " + host);
- return connection;
- }
- }
- return null;
- }
-
- public static Connection connect(String host, String vhost, String proxyFile) {
- Connection connection;
- try {
- String keyPassPhrase = "test123";
- KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
- KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
- kmf.init(ks, keyPassPhrase.toCharArray());
-
- KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
- TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
- tmf.init(tks);
-
- SSLContext c = SSLContext.getInstance("SSLv3");
- c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(host);
- factory.setPort(5671);
- factory.useSslProtocol(c);
- factory.setVirtualHost(vhost);
- factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
-
- connection = factory.newConnection();
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- return connection;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
deleted file mode 100644
index 42d4b8e..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.util;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.job.monitor.HostMonitorData;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.UserMonitorData;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class CommonUtils {
- private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class);
-
- public static boolean isPBSHost(HostDescription host){
- if("pbs".equals(((GsisshHostType)host.getType()).getJobManager()) ||
- "".equals(((GsisshHostType)host.getType()).getJobManager())){
- return true;
- }else{
- // default is pbs so we return true
- return false;
- }
- }
- public static boolean isSlurm(HostDescription host){
- if("slurm".equals(((GsisshHostType)host.getType()).getJobManager())){
- return true;
- }else{
- // default is pbs so we return true
- return false;
- }
- }
- public static boolean isSGE(HostDescription host){
- if("sge".equals(((GsisshHostType)host.getType()).getJobManager())){
- return true;
- }else{
- // default is pbs so we return true
- return false;
- }
- }
- public static String getChannelID(MonitorID monitorID) {
- return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName();
- }
-
- public static String getRoutingKey(MonitorID monitorID) {
- return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress();
- }
-
- public static String getChannelID(String userName,String hostAddress) {
- return userName + "-" + hostAddress;
- }
-
- public static String getRoutingKey(String userName,String hostAddress) {
- return "*." + userName + "." + hostAddress;
- }
-
- public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
- Iterator<UserMonitorData> iterator = queue.iterator();
- while (iterator.hasNext()) {
- UserMonitorData next = iterator.next();
- if (next.getUserName().equals(monitorID.getUserName())) {
- // then this is the right place to update
- List<HostMonitorData> monitorIDs = next.getHostMonitorData();
- for (HostMonitorData host : monitorIDs) {
- if (host.getHost().equals(monitorID.getHost())) {
- // ok we found right place to add this monitorID
- host.addMonitorIDForHost(monitorID);
- return;
- }
- }
- // there is a userMonitor object for this user name but no Hosts for this host
- // so we have to create new Hosts
- HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
- hostMonitorData.addMonitorIDForHost(monitorID);
- next.addHostMonitorData(hostMonitorData);
- return;
- }
- }
- HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
- hostMonitorData.addMonitorIDForHost(monitorID);
-
- UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
- userMonitorData.addHostMonitorData(hostMonitorData);
- try {
- queue.put(userMonitorData);
- } catch (InterruptedException e) {
- throw new AiravataMonitorException(e);
- }
- }
- public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
- Iterator<MonitorID> iterator = queue.iterator();
- while(iterator.hasNext()){
- MonitorID next = iterator.next();
- if(monitorID.getUserName().equals(next.getUserName()) && CommonUtils.isEqual(monitorID.getHost(),next.getHost())){
- return false;
- }
- }
- return true;
- }
- public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
- Iterator<UserMonitorData> iterator = queue.iterator();
- while(iterator.hasNext()){
- UserMonitorData next = iterator.next();
- if(next.getUserName().equals(monitorID.getUserName())){
- // then this is the right place to update
- List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
- for(HostMonitorData iHostMonitorID:hostMonitorData){
- if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
- List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
- for(MonitorID iMonitorID:monitorIDs){
- if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
- // OK we found the object, we cannot do list.remove(object) states of two objects
- // could be different, thats why we check the jobID
- monitorIDs.remove(iMonitorID);
- if(monitorIDs.size()==0) {
- hostMonitorData.remove(iHostMonitorID);
- if (hostMonitorData.size() == 0) {
- // no useful data so we have to remove the element from the queue
- queue.remove(next);
- }
- }
- return;
- }
- }
- }
- }
- }
- }
- throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
- monitorID.getUserName() + " and jobID " + monitorID.getJobID());
-
- }
-
- public static boolean isEqual(HostDescription host1,HostDescription host2) {
- if ((host1.getType() instanceof GsisshHostType) && (host2.getType() instanceof GsisshHostType)) {
- GsisshHostType hostType1 = (GsisshHostType)host1.getType();
- GsisshHostType hostType2 = (GsisshHostType)host2.getType();
- if(hostType1.getHostAddress().equals(hostType2.getHostAddress())
- && hostType1.getJobManager().equals(hostType2.getJobManager())
- && (hostType1.getPort() == hostType2.getPort())
- && hostType1.getMonitorMode().equals(hostType2.getMonitorMode())){
- return true;
- }
- } else {
- logger.error("This method is only impmlemented to handle Gsissh host types");
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
deleted file mode 100644
index 962dc81..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.util;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.security.KeyPair;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.PrivateKey;
-import java.security.cert.CertificateException;
-import java.security.cert.CertificateFactory;
-import java.security.cert.CertificateParsingException;
-import java.security.cert.X509Certificate;
-import java.security.spec.InvalidKeySpecException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.bouncycastle.openssl.PEMReader;
-
-public class X509Helper {
-
- static {
- // parsing of RSA key fails without this
- java.security.Security.addProvider(new BouncyCastleProvider());
- }
-
-
-
- public static KeyStore keyStoreFromPEM(String proxyFile,
- String keyPassPhrase) throws IOException,
- CertificateException,
- NoSuchAlgorithmException,
- InvalidKeySpecException,
- KeyStoreException {
- return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
- }
-
- public static KeyStore keyStoreFromPEM(String certFile,
- String keyFile,
- String keyPassPhrase) throws IOException,
- CertificateException,
- NoSuchAlgorithmException,
- InvalidKeySpecException,
- KeyStoreException {
- CertificateFactory cf = CertificateFactory.getInstance("X.509");
- X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
- //System.out.println(cert.toString());
-
- // this works for proxy files, too, since it skips over the certificate
- BufferedReader reader = new BufferedReader(new FileReader(keyFile));
- String line = null;
- StringBuilder builder = new StringBuilder();
- boolean inKey = false;
- while((line=reader.readLine()) != null) {
- if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
- inKey = true;
- }
- if (inKey) {
- builder.append(line);
- builder.append(System.getProperty("line.separator"));
- }
- if (line.contains("-----END RSA PRIVATE KEY-----")) {
- inKey = false;
- }
- }
- String privKeyPEM = builder.toString();
- //System.out.println(privKeyPEM);
-
- // using BouncyCastle
- PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM));
- Object object = pemParser.readObject();
-
- PrivateKey privKey = null;
- if(object instanceof KeyPair){
- privKey = ((KeyPair)object).getPrivate();
- }
- // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency
- /*
- // Base64 decode the data
- byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
-
- // PKCS8 decode the encoded RSA private key
- java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
- KeyFactory kf = KeyFactory.getInstance("RSA");
- PrivateKey privKey = kf.generatePrivate(keySpec);
- //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
- */
- //System.out.println(privKey.toString());
-
- KeyStore keyStore = KeyStore.getInstance("PKCS12");
- keyStore.load(null,null);
-
- KeyStore.PrivateKeyEntry entry =
- new KeyStore.PrivateKeyEntry(privKey,
- new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
- KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
- keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
-
- return keyStore;
- }
-
-
- public static KeyStore trustKeyStoreFromCertDir() throws IOException,
- KeyStoreException,
- CertificateException,
- NoSuchAlgorithmException, ApplicationSettingsException {
- return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location"));
- }
-
- public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
- KeyStoreException,
- CertificateException,
- NoSuchAlgorithmException {
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(null,null);
-
- File dir = new File(certDir);
- for(File file : dir.listFiles()) {
- if (!file.isFile()) {
- continue;
- }
- if (!file.getName().endsWith(".0")) {
- continue;
- }
-
- try {
- //System.out.println("reading file "+file.getName());
- CertificateFactory cf = CertificateFactory.getInstance("X.509");
- X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
- //System.out.println(cert.toString());
-
- KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
-
- ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
- } catch (KeyStoreException e) {
- } catch (CertificateParsingException e) {
- continue;
- }
-
- }
-
- return ks;
- }
-}
-