You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 21:42:35 UTC
[8/8] git commit: merging monitoring with gfac-core,
later this will be separated
merging monitoring with gfac-core, later this will be separated
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b0230849
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b0230849
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b0230849
Branch: refs/heads/master
Commit: b02308493b0d9c9497ce25fac70da0aef910a822
Parents: 0364b65
Author: lahiru <la...@apache.org>
Authored: Wed Apr 23 15:42:00 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Apr 23 15:42:00 2014 -0400
----------------------------------------------------------------------
.../main/resources/airavata-server.properties | 6 +-
modules/distribution/server/pom.xml | 10 -
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 41 +--
.../gfac/monitor/AbstractActivityListener.java | 27 ++
.../AiravataExperimentStatusUpdator.java | 81 +++++
.../gfac/monitor/AiravataJobStatusUpdator.java | 145 ++++++++
.../gfac/monitor/AiravataTaskStatusUpdator.java | 113 ++++++
.../AiravataWorkflowNodeStatusUpdator.java | 112 ++++++
.../gfac/monitor/ExperimentIdentity.java | 36 ++
.../airavata/gfac/monitor/HostMonitorData.java | 69 ++++
.../airavata/gfac/monitor/JobIdentity.java | 39 +++
.../apache/airavata/gfac/monitor/MonitorID.java | 225 ++++++++++++
.../airavata/gfac/monitor/MonitorManager.java | 347 +++++++++++++++++++
.../airavata/gfac/monitor/TaskIdentity.java | 38 ++
.../airavata/gfac/monitor/UserMonitorData.java | 76 ++++
.../gfac/monitor/WorkflowNodeIdentity.java | 37 ++
.../command/ExperimentCancelRequest.java | 38 ++
.../gfac/monitor/command/TaskCancelRequest.java | 52 +++
.../monitor/core/AiravataAbstractMonitor.java | 46 +++
.../gfac/monitor/core/MessageParser.java | 43 +++
.../airavata/gfac/monitor/core/Monitor.java | 30 ++
.../airavata/gfac/monitor/core/PullMonitor.java | 64 ++++
.../airavata/gfac/monitor/core/PushMonitor.java | 60 ++++
.../gfac/monitor/event/MonitorPublisher.java | 47 +++
.../exception/AiravataMonitorException.java | 37 ++
.../gfac/monitor/impl/LocalJobMonitor.java | 59 ++++
.../monitor/impl/pull/qstat/QstatMonitor.java | 262 ++++++++++++++
.../impl/pull/qstat/ResourceConnection.java | 151 ++++++++
.../monitor/impl/push/amqp/AMQPMonitor.java | 263 ++++++++++++++
.../monitor/impl/push/amqp/BasicConsumer.java | 86 +++++
.../impl/push/amqp/JSONMessageParser.java | 78 +++++
.../impl/push/amqp/UnRegisterWorker.java | 68 ++++
.../state/AbstractStateChangeRequest.java | 27 ++
.../state/ExperimentStatusChangeRequest.java | 63 ++++
.../monitor/state/JobStatusChangeRequest.java | 74 ++++
.../gfac/monitor/state/JobStatusInfo.java | 48 +++
.../gfac/monitor/state/PublisherMessage.java | 26 ++
.../monitor/state/TaskStatusChangeRequest.java | 61 ++++
.../state/WorkflowNodeStatusChangeRequest.java | 63 ++++
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 +++
.../monitor/state/impl/GridJobStatusInfo.java | 40 +++
.../gfac/monitor/util/AMQPConnectionUtil.java | 77 ++++
.../airavata/gfac/monitor/util/CommonUtils.java | 172 +++++++++
.../airavata/gfac/monitor/util/X509Helper.java | 161 +++++++++
.../airavata/gfac/provider/GFacProvider.java | 4 +-
.../gfac/provider/impl/AbstractProvider.java | 10 +-
.../gfac/provider/impl/GSISSHProvider.java | 3 -
.../src/main/resources/schema/AccessPolicy.json | 13 +
.../src/main/resources/schema/Activity.json | 31 ++
.../src/main/resources/schema/AdminDomain.json | 51 +++
.../schema/ApplicationEnvironment.json | 86 +++++
.../resources/schema/ApplicationHandle.json | 21 ++
.../src/main/resources/schema/Benchmark.json | 21 ++
.../resources/schema/ComputingActivity.json | 165 +++++++++
.../resources/schema/ComputingEndpoint.json | 44 +++
.../main/resources/schema/ComputingManager.json | 117 +++++++
.../main/resources/schema/ComputingService.json | 32 ++
.../main/resources/schema/ComputingShare.json | 182 ++++++++++
.../src/main/resources/schema/Contact.json | 32 ++
.../src/main/resources/schema/DataStore.json | 30 ++
.../src/main/resources/schema/Domain.json | 30 ++
.../src/main/resources/schema/Endpoint.json | 147 ++++++++
.../src/main/resources/schema/Entity.json | 35 ++
.../resources/schema/ExecutionEnvironment.json | 115 ++++++
.../src/main/resources/schema/Glue2.json | 246 +++++++++++++
.../src/main/resources/schema/Location.json | 47 +++
.../src/main/resources/schema/Manager.json | 28 ++
.../main/resources/schema/MappingPolicy.json | 13 +
.../src/main/resources/schema/Policy.json | 27 ++
.../src/main/resources/schema/Resource.json | 27 ++
.../src/main/resources/schema/Service.json | 75 ++++
.../src/main/resources/schema/Share.json | 45 +++
.../resources/schema/StorageAccessProtocol.json | 32 ++
.../main/resources/schema/StorageEndpoint.json | 8 +
.../main/resources/schema/StorageManager.json | 8 +
.../main/resources/schema/StorageService.json | 22 ++
.../schema/StorageServiceCapacity.json | 33 ++
.../src/main/resources/schema/StorageShare.json | 65 ++++
.../resources/schema/StorageShareCapacity.json | 33 ++
.../resources/schema/ToComputingService.json | 32 ++
.../main/resources/schema/ToStorageService.json | 25 ++
.../src/main/resources/schema/UserDomain.json | 58 ++++
.../apache/airavata/job/AMQPMonitorTest.java | 175 ++++++++++
.../job/QstatMonitorTestWithMyProxyAuth.java | 167 +++++++++
modules/gfac/gfac-monitor/pom.xml | 80 -----
.../job/monitor/AbstractActivityListener.java | 27 --
.../airavata/job/monitor/MonitorManager.java | 347 -------------------
.../command/ExperimentCancelRequest.java | 38 --
.../job/monitor/command/TaskCancelRequest.java | 52 ---
.../src/main/resources/PBSTemplate.xslt | 77 ----
.../src/main/resources/gsissh.properties | 26 --
.../airavata/job/monitor/AMQPMonitorTest.java | 144 --------
.../QstatMonitorTestWithMyProxyAuth.java | 165 ---------
.../src/test/resources/gsissh.properties | 26 --
.../src/test/resources/monitor.properties | 3 -
modules/gfac/gfac-ssh/pom.xml | 2 +-
.../gfac/handler/AdvancedSCPInputHandler.java | 13 +-
.../gfac/handler/AdvancedSCPOutputHandler.java | 19 +-
.../gfac/handler/SCPDirectorySetupHandler.java | 10 +
.../airavata/gfac/handler/SCPInputHandler.java | 11 +-
.../airavata/gfac/handler/SCPOutputHandler.java | 11 +-
.../gfac/provider/impl/SSHProvider.java | 13 +-
.../apache/airavata/gfac/util/GFACSSHUtils.java | 88 +++++
modules/gfac/pom.xml | 1 -
.../airavata-orchestrator-service/pom.xml | 7 +-
.../server/OrchestratorServerHandler.java | 136 ++------
.../src/test/resources/monitor.properties | 4 +-
modules/orchestrator/orchestrator-core/pom.xml | 5 -
.../core/context/OrchestratorContext.java | 2 +-
.../airavata/orchestrator/cpi/Orchestrator.java | 9 +-
.../cpi/impl/SimpleOrchestratorImpl.java | 142 +++++++-
.../orchestrator/core/NewOrchestratorTest.java | 20 +-
.../core/OrchestratorTestWithMyProxyAuth.java | 100 +++---
tools/job-monitor/pom.xml | 162 ---------
.../job/monitor/ExperimentIdentity.java | 36 --
.../airavata/job/monitor/HostMonitorData.java | 69 ----
.../airavata/job/monitor/JobIdentity.java | 39 ---
.../airavata/job/monitor/TaskIdentity.java | 38 --
.../airavata/job/monitor/UserMonitorData.java | 76 ----
.../job/monitor/WorkflowNodeIdentity.java | 37 --
.../monitor/core/AiravataAbstractMonitor.java | 46 ---
.../job/monitor/core/MessageParser.java | 47 ---
.../airavata/job/monitor/core/Monitor.java | 30 --
.../airavata/job/monitor/core/PullMonitor.java | 64 ----
.../airavata/job/monitor/core/PushMonitor.java | 61 ----
.../job/monitor/event/MonitorPublisher.java | 48 ---
.../exception/AiravataMonitorException.java | 37 --
.../job/monitor/impl/LocalJobMonitor.java | 59 ----
.../monitor/impl/pull/qstat/QstatMonitor.java | 262 --------------
.../impl/pull/qstat/ResourceConnection.java | 152 --------
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 268 --------------
.../monitor/impl/push/amqp/BasicConsumer.java | 93 -----
.../impl/push/amqp/JSONMessageParser.java | 82 -----
.../impl/push/amqp/UnRegisterWorker.java | 68 ----
.../state/AbstractStateChangeRequest.java | 27 --
.../state/ExperimentStatusChangeRequest.java | 63 ----
.../monitor/state/JobStatusChangeRequest.java | 74 ----
.../job/monitor/state/JobStatusInfo.java | 48 ---
.../job/monitor/state/PublisherMessage.java | 26 --
.../monitor/state/TaskStatusChangeRequest.java | 61 ----
.../state/WorkflowNodeStatusChangeRequest.java | 63 ----
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 ---
.../monitor/state/impl/GridJobStatusInfo.java | 40 ---
.../job/monitor/util/AMQPConnectionUtil.java | 78 -----
.../airavata/job/monitor/util/CommonUtils.java | 174 ----------
.../airavata/job/monitor/util/X509Helper.java | 170 ---------
.../src/main/resources/PBSTemplate.xslt | 77 ----
.../src/main/resources/gsissh.properties | 26 --
.../src/main/resources/schema/AccessPolicy.json | 13 -
.../src/main/resources/schema/Activity.json | 31 --
.../src/main/resources/schema/AdminDomain.json | 51 ---
.../schema/ApplicationEnvironment.json | 86 -----
.../resources/schema/ApplicationHandle.json | 21 --
.../src/main/resources/schema/Benchmark.json | 21 --
.../resources/schema/ComputingActivity.json | 165 ---------
.../resources/schema/ComputingEndpoint.json | 44 ---
.../main/resources/schema/ComputingManager.json | 117 -------
.../main/resources/schema/ComputingService.json | 32 --
.../main/resources/schema/ComputingShare.json | 182 ----------
.../src/main/resources/schema/Contact.json | 32 --
.../src/main/resources/schema/DataStore.json | 30 --
.../src/main/resources/schema/Domain.json | 30 --
.../src/main/resources/schema/Endpoint.json | 147 --------
.../src/main/resources/schema/Entity.json | 35 --
.../resources/schema/ExecutionEnvironment.json | 115 ------
.../src/main/resources/schema/Glue2.json | 246 -------------
.../src/main/resources/schema/Location.json | 47 ---
.../src/main/resources/schema/Manager.json | 28 --
.../main/resources/schema/MappingPolicy.json | 13 -
.../src/main/resources/schema/Policy.json | 27 --
.../src/main/resources/schema/Resource.json | 27 --
.../src/main/resources/schema/Service.json | 75 ----
.../src/main/resources/schema/Share.json | 45 ---
.../resources/schema/StorageAccessProtocol.json | 32 --
.../main/resources/schema/StorageEndpoint.json | 8 -
.../main/resources/schema/StorageManager.json | 8 -
.../main/resources/schema/StorageService.json | 22 --
.../schema/StorageServiceCapacity.json | 33 --
.../src/main/resources/schema/StorageShare.json | 65 ----
.../resources/schema/StorageShareCapacity.json | 33 --
.../resources/schema/ToComputingService.json | 32 --
.../main/resources/schema/ToStorageService.json | 25 --
.../src/main/resources/schema/UserDomain.json | 58 ----
.../apache/airavata/job/AMQPMonitorTest.java | 179 ----------
.../job/QstatMonitorTestWithMyProxyAuth.java | 168 ---------
.../src/test/resources/gsissh.properties | 26 --
.../src/test/resources/monitor.properties | 3 -
tools/pom.xml | 1 -
188 files changed, 6288 insertions(+), 6355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 4a65a71..7169ea8 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -256,13 +256,13 @@ TwoPhase=true
###---------------------------Monitoring module Configurations---------------------------###
#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
#mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.LocalJobMonitor
-#,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
+#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
-activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.monitor.AiravataExperimentStatusUpdator
###---------------------------Orchestrator module Configurations---------------------------###
job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 6629860..0ff7f4e 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -331,16 +331,6 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>job-monitor-tool</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>gfac-monitor</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
<artifactId>airavata-api-server</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 064c9ba..14ea519 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -51,7 +51,6 @@ import org.apache.airavata.gfac.notification.listeners.WorkflowTrackingListener;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.scheduler.HostScheduler;
import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
@@ -64,10 +63,10 @@ import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthentica
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.job.monitor.AbstractActivityListener;
-import org.apache.airavata.job.monitor.MonitorManager;
-import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
-import org.apache.airavata.job.monitor.command.TaskCancelRequest;
+import org.apache.airavata.gfac.monitor.AbstractActivityListener;
+import org.apache.airavata.gfac.monitor.MonitorManager;
+import org.apache.airavata.gfac.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.gfac.monitor.command.TaskCancelRequest;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -451,41 +450,9 @@ public class GFacImpl implements GFac, AbstractActivityListener {
// if (this.configuration.getAmazonSecurityContext() != null) {
// jobExecutionContext.addSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT,
// this.configuration.getAmazonSecurityContext());
- } else if (registeredHost.getType() instanceof SSHHostType) {
- String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
- String sshPrivateKey = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
- String sshPrivateKeyPass = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
- String sshPassword = configurationProperties.getProperty(Constants.SSH_PASSWORD);
- String sshPublicKey = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
- SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
- AuthenticationInfo authenticationInfo = null;
- // we give higher preference to the password over keypair ssh authentication
- if (sshPassword != null) {
- authenticationInfo = new DefaultPasswordAuthenticationInfo(sshPassword);
- } else {
- authenticationInfo = new DefaultPublicKeyFileAuthentication(sshPublicKey, sshPrivateKey, sshPrivateKeyPass);
- }
- ServerInfo serverInfo = new ServerInfo(sshUserName, registeredHost.getType().getHostAddress());
-
- Cluster pbsCluster = null;
- try {
- String installedParentPath = "/";
- if(((SSHHostType) registeredHost.getType()).getHpcResource()){
- installedParentPath = ((HpcApplicationDeploymentType)
- jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
- }
- pbsCluster = new PBSCluster(serverInfo, authenticationInfo,
- CommonUtils.getPBSJobManager(installedParentPath));
- } catch (SSHApiException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- sshSecurityContext.setPbsCluster(pbsCluster);
- sshSecurityContext.setUsername(sshUserName);
- jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
}
}
- @Override
public void setup(Object... configurations) {
for (Object configuration : configurations) {
if (configuration instanceof MonitorManager){
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
new file mode 100644
index 0000000..63f89df
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+
+public interface AbstractActivityListener {
+ public void setup(Object... configurations);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
new file mode 100644
index 0000000..a70b14f
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
+
+ private Registry airavataRegistry;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+ ExperimentState state = experimentStatus.getState();
+ if (state != null) {
+ try {
+ String experimentID = experimentStatus.getIdentity().getExperimentID();
+ updateExperimentStatus(experimentID, state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
+ Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+ if(details == null) {
+ details = new Experiment();
+ details.setExperimentID(experimentId);
+ }
+ org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+ status.setExperimentState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setExperimentStatus(status);
+ airavataRegistry.update(DataType.EXPERIMENT, details, experimentId);
+ }
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..99c8733
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.BlockingQueue;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+
+ private Registry airavataRegistry;
+
+ private MonitorPublisher monitorPublisher;
+
+ private BlockingQueue<MonitorID> jobsToMonitor;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ public BlockingQueue<MonitorID> getJobsToMonitor() {
+ return jobsToMonitor;
+ }
+
+ public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
+ this.jobsToMonitor = jobsToMonitor;
+ }
+
+ @Subscribe
+ public void updateRegistry(JobStatusChangeRequest jobStatus) {
+ /* Here we need to parse the jobStatus message and update
+ the registry accordingly, for now we are just printing to standard Out
+ */
+ JobState state = jobStatus.getState();
+ if (state != null) {
+ try {
+ String taskID = jobStatus.getIdentity().getTaskId();
+ String jobID = jobStatus.getIdentity().getJobId();
+ updateJobStatus(taskID, jobID, state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
+ switch (state) {
+ case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
+ jobsToMonitor.remove(jobStatus.getMonitorID());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ @Subscribe
+ public void setupTaskStatus(JobStatusChangeRequest jobStatus){
+ TaskState state=TaskState.UNKNOWN;
+ switch(jobStatus.getState()){
+ case ACTIVE:
+ state=TaskState.EXECUTING; break;
+ case CANCELED:
+ state=TaskState.CANCELED; break;
+ case COMPLETE:
+ state=TaskState.COMPLETED; break;
+ case FAILED:
+ state=TaskState.FAILED; break;
+ case HELD: case SUSPENDED: case QUEUED:
+ state=TaskState.WAITING; break;
+ case SETUP:
+ state=TaskState.PRE_PROCESSING; break;
+ case SUBMITTED:
+ state=TaskState.STARTED; break;
+ case UN_SUBMITTED:
+ state=TaskState.CANCELED; break;
+ case CANCELING:
+ state=TaskState.CANCELING; break;
+ default:
+ break;
+ }
+ logger.debug("Publishing Task Status "+state.toString());
+ monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
+ }
+
+ public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
+ CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+ JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
+ if(details == null) {
+ details = new JobDetails();
+ }
+ org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+ status.setJobState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setJobStatus(status);
+ details.setJobID(jobID);
+ airavataRegistry.update(DataType.JOB_DETAIL, details, ids);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof BlockingQueue<?>){
+ this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..e8dd7a0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+
+ private Registry airavataRegistry;
+
+ private MonitorPublisher monitorPublisher;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void updateRegistry(TaskStatusChangeRequest taskStatus) {
+ TaskState state = taskStatus.getState();
+ if (state != null) {
+ try {
+ String taskID = taskStatus.getIdentity().getTaskId();
+ updateTaskStatus(taskID, state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ @Subscribe
+ public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
+ WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+ switch(taskStatus.getState()){
+ case CANCELED:
+ state=WorkflowNodeState.CANCELED; break;
+ case COMPLETED:
+ state=WorkflowNodeState.COMPLETED; break;
+ case CONFIGURING_WORKSPACE:
+ state=WorkflowNodeState.INVOKED; break;
+ case FAILED:
+ state=WorkflowNodeState.FAILED; break;
+ case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+ state=WorkflowNodeState.EXECUTING; break;
+ case STARTED:
+ state=WorkflowNodeState.INVOKED; break;
+ case CANCELING:
+ state=WorkflowNodeState.CANCELING; break;
+ default:
+ break;
+ }
+ logger.debug("Publishing Experiment Status "+state.toString());
+ monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
+ }
+
+ public void updateTaskStatus(String taskId, TaskState state) throws Exception {
+ TaskDetails details = (TaskDetails)airavataRegistry.get(DataType.TASK_DETAIL, taskId);
+ if(details == null) {
+ details = new TaskDetails();
+ details.setTaskID(taskId);
+ }
+ org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
+ status.setExecutionState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setTaskStatus(status);
+ airavataRegistry.update(DataType.TASK_DETAIL, details, taskId);
+ }
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..2375d72
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+ private Registry airavataRegistry;
+
+ private MonitorPublisher monitorPublisher;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
+ WorkflowNodeState state = workflowNodeStatus.getState();
+ if (state != null) {
+ try {
+ String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
+ updateWorkflowNodeStatus(workflowNodeID, state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ @Subscribe
+ public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
+ ExperimentState state=ExperimentState.UNKNOWN;
+ switch(nodeStatus.getState()){
+ case CANCELED:
+ state=ExperimentState.CANCELED; break;
+ case COMPLETED:
+ state=ExperimentState.COMPLETED; break;
+ case INVOKED:
+ state=ExperimentState.LAUNCHED; break;
+ case FAILED:
+ state=ExperimentState.FAILED; break;
+ case EXECUTING:
+ state=ExperimentState.EXECUTING; break;
+ case CANCELING:
+ state=ExperimentState.CANCELING; break;
+ default:
+ break;
+ }
+ logger.debug("Publishing Experiment Status "+state.toString());
+ monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
+ }
+
+ public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+ WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+ if(details == null) {
+ details = new WorkflowNodeDetails();
+ details.setNodeInstanceId(workflowNodeId);
+ }
+ WorkflowNodeStatus status = new WorkflowNodeStatus();
+ status.setWorkflowNodeState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setWorkflowNodeStatus(status);
+ airavataRegistry.update(DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
+ }
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
new file mode 100644
index 0000000..ba8efeb
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class ExperimentIdentity {
+ private String experimentID;
+ public ExperimentIdentity(String experimentId) {
+ setExperimentID(experimentId);
+ }
+ public String getExperimentID() {
+ return experimentID;
+ }
+
+ public void setExperimentID(String experimentID) {
+ this.experimentID = experimentID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
new file mode 100644
index 0000000..e57087d
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+ private HostDescription host;
+
+ private List<MonitorID> monitorIDs;
+
+ public HostMonitorData(HostDescription host) {
+ this.host = host;
+ monitorIDs = new ArrayList<MonitorID>();
+ }
+
+ public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
+ this.host = host;
+ this.monitorIDs = monitorIDs;
+ }
+
+ public HostDescription getHost() {
+ return host;
+ }
+
+ public void setHost(HostDescription host) {
+ this.host = host;
+ }
+
+ public List<MonitorID> getMonitorIDs() {
+ return monitorIDs;
+ }
+
+ public void setMonitorIDs(List<MonitorID> monitorIDs) {
+ this.monitorIDs = monitorIDs;
+ }
+
+ /**
+ * this method get called by CommonUtils and it will check the right place before adding
+ * so there will not be a mismatch between this.host and monitorID.host
+ * @param monitorID
+ * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+ */
+ public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
+ monitorIDs.add(monitorID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
new file mode 100644
index 0000000..84c4a55
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class JobIdentity extends TaskIdentity {
+ private String jobId;
+
+ public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+ super(experimentId,workflowNodeId,taskId);
+ setJobId(jobId);
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
new file mode 100644
index 0000000..c022bef
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
+/*
+This is the object which contains the data to identify a particular
+Job to start the monitoring
+*/
+public class MonitorID {
+ private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
+
+ private String userName;
+
+ private Timestamp jobStartedTime;
+
+ private Timestamp lastMonitored;
+
+ private HostDescription host;
+
+ private AuthenticationInfo authenticationInfo = null;
+
+ private Map<String, Object> parameters;
+
+ private String experimentID;
+
+ private String workflowNodeID;
+
+ private String taskID;
+
+ private String jobID;
+
+ private int failedCount = 0;
+
+ private JobState state;
+
+ public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
+ this.host = host;
+ this.jobStartedTime = new Timestamp((new Date()).getTime());
+ this.userName = userName;
+ this.jobID = jobID;
+ this.taskID = taskID;
+ this.experimentID = experimentID;
+ }
+
+ public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
+ this.host = host;
+ this.jobStartedTime = new Timestamp((new Date()).getTime());
+ this.authenticationInfo = authenticationInfo;
+ this.userName = userName;
+ // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
+ if(this.authenticationInfo != null){
+ if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
+ this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
+ }
+ }
+ this.jobID = jobID;
+ this.taskID = taskID;
+ this.experimentID = experimentID;
+ }
+ public HostDescription getHost() {
+ return host;
+ }
+
+ public void setHost(HostDescription host) {
+ this.host = host;
+ }
+
+ public Timestamp getLastMonitored() {
+ return lastMonitored;
+ }
+
+ public void setLastMonitored(Timestamp lastMonitored) {
+ this.lastMonitored = lastMonitored;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getJobID() {
+ return jobID;
+ }
+
+ public void setJobID(String jobID) {
+ this.jobID = jobID;
+ }
+
+ public Timestamp getJobStartedTime() {
+ return jobStartedTime;
+ }
+
+ public void setJobStartedTime(Timestamp jobStartedTime) {
+ this.jobStartedTime = jobStartedTime;
+ }
+
+ public AuthenticationInfo getAuthenticationInfo() {
+ return authenticationInfo;
+ }
+
+ public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+ this.authenticationInfo = authenticationInfo;
+ }
+
+ public void addParameter(String key,Object value) {
+ this.parameters.put(key, value);
+ }
+
+ public Object getParameter(String key) {
+ return this.parameters.get(key);
+ }
+
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getExperimentID() {
+ return experimentID;
+ }
+
+ public void setExperimentID(String experimentID) {
+ this.experimentID = experimentID;
+ }
+
+ public String getTaskID() {
+ return taskID;
+ }
+
+ public void setTaskID(String taskID) {
+ this.taskID = taskID;
+ }
+
+ public int getFailedCount() {
+ return failedCount;
+ }
+
+ public void setFailedCount(int failedCount) {
+ this.failedCount = failedCount;
+ }
+
+ public JobState getStatus() {
+ return state;
+ }
+
+ public void setStatus(JobState status) {
+ // this logic is going to be useful for fast finishing jobs
+ // because in some machines job state vanishes quicckly when the job is done
+ // during that case job state comes as unknown.so we handle it here.
+ if (this.state != null && status.equals(JobState.UNKNOWN)) {
+ if (getFailedCount() > 2) {
+ switch (this.state) {
+ case ACTIVE:
+ this.state = JobState.COMPLETE;
+ break;
+ case QUEUED:
+ this.state = JobState.COMPLETE;
+ break;
+ }
+ } else {
+ try {
+ // when state becomes unknown we sleep for a while
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ setFailedCount(getFailedCount() + 1);
+ }
+ } else {
+ // normal scenario
+ this.state = status;
+ }
+ }
+
+ public String getWorkflowNodeID() {
+ return workflowNodeID;
+ }
+
+ public void setWorkflowNodeID(String workflowNodeID) {
+ this.workflowNodeID = workflowNodeID;
+ }
+
+// public String getWorkflowNodeID() {
+// return workflowNodeID;
+// }
+//
+// public void setWorkflowNodeID(String workflowNodeID) {
+// this.workflowNodeID = workflowNodeID;
+// }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
new file mode 100644
index 0000000..b703a0a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.core.Monitor;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.LocalJobMonitor;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/*
+this is the manager class for monitoring system of airavata,
+This simply handle the monitoring flow of the system.
+Keeps available jobs to monitor in a queue and once they are done
+remove them from the queue, this will be done by AiravataJobUpdator.
+ */
+public class MonitorManager {
+ private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
+
+ private final static String ACTIVITY_LISTENERS = "activity.listeners";
+
+ private List<PullMonitor> pullMonitors; //todo though we have a List we only support one at a time
+
+ private List<PushMonitor> pushMonitors; //todo we need to support multiple monitors dynamically
+
+ private BlockingQueue<UserMonitorData> pullQueue;
+
+ private BlockingQueue<MonitorID> pushQueue;
+
+ private BlockingQueue<MonitorID> localJobQueue;
+
+ private BlockingQueue<MonitorID> finishQueue;
+
+ private MonitorPublisher monitorPublisher;
+
+ private Monitor localJobMonitor;
+
+ private Registry registry;
+
+ /**
+ * This will initialize the major monitoring system.
+ */
+ public MonitorManager() {
+ this(new RegistryImpl());
+ }
+
+ public MonitorManager(Registry registry) {
+ pullMonitors = new ArrayList<PullMonitor>();
+ pushMonitors = new ArrayList<PushMonitor>();
+ pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+ pushQueue = new LinkedBlockingQueue<MonitorID>();
+ finishQueue = new LinkedBlockingQueue<MonitorID>();
+ localJobQueue = new LinkedBlockingQueue<MonitorID>();
+ monitorPublisher = new MonitorPublisher(new EventBus());
+ this.registry = registry;
+ loadActivityMonitors();
+ }
+
+ private void loadActivityMonitors(){
+ try {
+ String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
+ if (activityListenersString!=null){
+ String[] activityListenerClasses = activityListenersString.split(",");
+ for (String activityListenerClassName : activityListenerClasses) {
+ try {
+ activityListenerClassName=activityListenerClassName.trim();
+ Class<?> classInstance = MonitorManager.class
+ .getClassLoader().loadClass(activityListenerClassName);
+ AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
+ registerListener(monitor);
+ } catch (ClassNotFoundException e) {
+ logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
+ } catch (InstantiationException e) {
+ logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
+ } catch (IllegalAccessException e) {
+ logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
+ } catch (ClassCastException e){
+ logger.error("Invalid activity monitor \""+activityListenerClassName+"\"!!!",e);
+ }
+ }
+ }
+ } catch (ApplicationSettingsException e1) {
+ logger.warn("Error in reading activity monitors!!!", e1);
+ }
+
+ }
+ /**
+ * This can be use to add an empty AMQPMonitor object to the monitor system
+ * and tihs method will take care of the initialization
+ * todo may be we need to move this to some other class
+ * @param monitor
+ */
+ public void addAMQPMonitor(AMQPMonitor monitor) {
+ monitor.setPublisher(this.getMonitorPublisher());
+ monitor.setFinishQueue(this.getFinishQueue());
+ monitor.setRunningQueue(this.getPushQueue());
+ addPushMonitor(monitor);
+ }
+
+
+ /**
+ * This can be use to add an empty AMQPMonitor object to the monitor system
+ * and tihs method will take care of the initialization
+ * todo may be we need to move this to some other class
+ * @param monitor
+ */
+ public void addLocalMonitor(LocalJobMonitor monitor) {
+ monitor.setPublisher(this.getMonitorPublisher());
+ monitor.setJobQueue(this.getLocalJobQueue());
+ localJobMonitor = monitor;
+ }
+
+ /**
+ * This can be used to adda a QstatMonitor and it will take care of
+ * the initialization of QstatMonitor
+ * //todo may be we need to move this to some other class
+ * @param qstatMonitor
+ */
+ public void addQstatMonitor(QstatMonitor qstatMonitor) {
+ qstatMonitor.setPublisher(this.getMonitorPublisher());
+ qstatMonitor.setQueue(this.getPullQueue());
+ addPullMonitor(qstatMonitor);
+
+ }
+
+ /**
+ * To deal with the statuses users can write their own listener and implement their own logic
+ *
+ * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+ * have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+ */
+ public void registerListener(Object listener) {
+ monitorPublisher.registerListener(listener);
+ if (listener instanceof AbstractActivityListener){
+ ((AbstractActivityListener)listener).setup(registry, getFinishQueue(), getMonitorPublisher(), this);
+ }
+ }
+
+ public void registerListener(AbstractActivityListener listener) {
+ registerListener((Object)listener);
+ }
+
+ /**
+ * To remove listeners of changing statuses
+ *
+ * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+ * have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+ */
+ public void unregisterListener(Object listener) {
+ monitorPublisher.unregisterListener(listener);
+ }
+
+ /**
+ * todo write
+ *
+ * @param monitor
+ */
+ public void addPushMonitor(PushMonitor monitor) {
+ pushMonitors.add(monitor);
+ }
+
+ /**
+ * todo write
+ *
+ * @param monitor
+ */
+ public void addPullMonitor(PullMonitor monitor) {
+ pullMonitors.add(monitor);
+ }
+
+
+ /**
+ * Adding this method will trigger the thread in launchMonitor and notify it
+ * This is going to be useful during the startup of the launching process
+ * @param monitorID
+ * @throws AiravataMonitorException
+ * @throws InterruptedException
+ */
+ public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
+
+ if (monitorID.getHost().getType() instanceof GsisshHostType) {
+ GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
+ if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
+ || Constants.PULL.equals(host.getMonitorMode())) {
+ CommonUtils.addMonitortoQueue(pullQueue, monitorID);
+ } else if (Constants.PUSH.equals(host.getMonitorMode())) {
+ pushQueue.put(monitorID);
+ finishQueue.put(monitorID);
+ }
+ } else if(monitorID.getHost().getType() instanceof GlobusHostType){
+ logger.error("Monitoring does not support GlubusHostType resources");
+ } else if(monitorID.getHost().getType() instanceof SSHHostType) {
+ logger.error("Monitoring does not support GlubusHostType resources");
+ localJobQueue.add(monitorID);
+ } else {
+ // we assume this is a type of localJobtype
+ localJobQueue.add(monitorID);
+ }
+ }
+
+ /**
+ * This method should be invoked before adding any elements to monitorQueue
+ * In this method we assume that we give higher preference to Push
+ * Monitorig mechanism if there's any configured, otherwise Pull
+ * monitoring will be launched.
+ * Ex: If there's a reasource which doesn't support Push, we have
+ * to live with Pull MOnitoring.
+ *
+ * @throws AiravataMonitorException
+ */
+ public void launchMonitor() throws AiravataMonitorException {
+ //no push monitor is configured so we launch pull monitor
+ if(localJobMonitor != null){
+ (new Thread(localJobMonitor)).start();
+ }
+
+ for (PullMonitor monitor : pullMonitors) {
+ (new Thread(monitor)).start();
+ }
+
+ //todo fix this
+ for (PushMonitor monitor : pushMonitors) {
+ (new Thread(monitor)).start();
+ }
+ }
+
+ /**
+ * This method should be invoked before adding any elements to monitorQueue
+ * In this method we assume that we give higher preference to Push
+ * Monitorig mechanism if there's any configured, otherwise Pull
+ * monitoring will be launched.
+ * Ex: If there's a reasource which doesn't support Push, we have
+ * to live with Pull MOnitoring.
+ *
+ * @throws AiravataMonitorException
+ */
+ public void stopMonitor() throws AiravataMonitorException {
+ //no push monitor is configured so we launch pull monitor
+ if(localJobMonitor != null){
+ (new Thread(localJobMonitor)).interrupt();
+ }
+
+ for (PullMonitor monitor : pullMonitors) {
+ (new Thread(monitor)).interrupt();
+ }
+
+ //todo fix this
+ for (PushMonitor monitor : pushMonitors) {
+ (new Thread(monitor)).interrupt();
+ }
+ }
+ /* getter setters for the private variables */
+
+ public List<PullMonitor> getPullMonitors() {
+ return pullMonitors;
+ }
+
+ public void setPullMonitors(List<PullMonitor> pullMonitors) {
+ this.pullMonitors = pullMonitors;
+ }
+
+ public List<PushMonitor> getPushMonitors() {
+ return pushMonitors;
+ }
+
+ public void setPushMonitors(List<PushMonitor> pushMonitors) {
+ this.pushMonitors = pushMonitors;
+ }
+
+ public BlockingQueue<UserMonitorData> getPullQueue() {
+ return pullQueue;
+ }
+
+ public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
+ this.pullQueue = pullQueue;
+ }
+
+ public MonitorPublisher getMonitorPublisher() {
+ return monitorPublisher;
+ }
+
+ public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+ this.monitorPublisher = monitorPublisher;
+ }
+
+ public BlockingQueue<MonitorID> getFinishQueue() {
+ return finishQueue;
+ }
+
+ public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+ this.finishQueue = finishQueue;
+ }
+
+ public BlockingQueue<MonitorID> getPushQueue() {
+ return pushQueue;
+ }
+
+ public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+ this.pushQueue = pushQueue;
+ }
+
+ public BlockingQueue<MonitorID> getLocalJobQueue() {
+ return localJobQueue;
+ }
+
+ public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
+ this.localJobQueue = localJobQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
new file mode 100644
index 0000000..c6d386e
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+ private String taskId;
+
+ public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+ super(experimentId,workflowNodeId);
+ setTaskId(taskId);
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+ private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+ private String userName;
+
+ private List<HostMonitorData> hostMonitorData;
+
+
+ public UserMonitorData(String userName) {
+ this.userName = userName;
+ hostMonitorData = new ArrayList<HostMonitorData>();
+ }
+
+ public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+ this.hostMonitorData = hostMonitorDataList;
+ this.userName = userName;
+ }
+
+ public List<HostMonitorData> getHostMonitorData() {
+ return hostMonitorData;
+ }
+
+ public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+ this.hostMonitorData = hostMonitorData;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /*
+ This method will add element to the MonitorID list, user should not
+ duplicate it, we do not check it because its going to be used by airavata
+ so we have to use carefully and this method will add a host if its a new host
+ */
+ public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+ this.hostMonitorData.add(hostMonitorData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
new file mode 100644
index 0000000..e569c52
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+ private String workflowNodeID;
+ public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+ super(experimentId);
+ setWorkflowNodeID(workflowNodeId);
+ }
+ public String getWorkflowNodeID() {
+ return workflowNodeID;
+ }
+
+ public void setWorkflowNodeID(String workflowNodeID) {
+ this.workflowNodeID = workflowNodeID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+ private String experimentId;
+
+ public ExperimentCancelRequest(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+ private String experimentId;
+ private String nodeId;
+ private String taskId;
+
+ public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+ this.experimentId = experimentId;
+ this.setNodeId(nodeId);
+ this.taskId = taskId;
+ }
+ public String getExperimentId() {
+ return experimentId;
+ }
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..1f85921
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect to consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+ protected MonitorPublisher publisher;
+
+ public MonitorPublisher getPublisher() {
+ return publisher;
+ }
+
+ public void setPublisher(MonitorPublisher publisher) {
+ this.publisher = publisher;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+ /**
+ * This method is to implement how to parse the incoming message
+ * and implement a logic to finalize the status of the job,
+ * we have to makesure the correct message is given to the messageparser
+ * parse method, it will not do any filtering
+ * @param message content of the message
+ * @return
+ */
+ JobState parseMessage(String message)throws AiravataMonitorException;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..d2fede5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor extends Runnable {
+
+}