You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/01 20:29:27 UTC
[9/9] git commit: Separating gfac-monitoring implementation
Separating gfac-monitoring implementation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/553caa08
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/553caa08
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/553caa08
Branch: refs/heads/master
Commit: 553caa0878fd8c16816583549d573b0c9257cee7
Parents: 0eabb93
Author: lahiru <la...@apache.org>
Authored: Thu May 1 14:28:39 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Thu May 1 14:28:39 2014 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 2 +-
.../server/src/main/resources/gfac-config.xml | 5 +-
modules/distribution/server/pom.xml | 5 +
.../server/src/main/assembly/bin-assembly.xml | 1 +
.../gfac/provider/impl/BESProvider.java | 7 +-
modules/gfac/gfac-core/pom.xml | 36 --
.../org/apache/airavata/gfac/Constants.java | 2 +
.../org/apache/airavata/gfac/ExecutionMode.java | 39 +++
.../apache/airavata/gfac/GFacConfiguration.java | 21 +-
.../org/apache/airavata/gfac/Scheduler.java | 42 ++-
.../gfac/context/JobExecutionContext.java | 31 +-
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 141 +++++---
.../airavata/gfac/handler/AbstractHandler.java | 3 +-
.../gfac/handler/AppDescriptorCheckHandler.java | 3 +-
.../airavata/gfac/handler/GFacHandler.java | 4 +-
.../gfac/handler/GFacHandlerException.java | 4 +
.../airavata/gfac/handler/ThreadedHandler.java | 31 ++
.../gfac/monitor/AbstractActivityListener.java | 27 --
.../AiravataExperimentStatusUpdator.java | 81 -----
.../gfac/monitor/AiravataJobStatusUpdator.java | 145 --------
.../gfac/monitor/AiravataTaskStatusUpdator.java | 113 ------
.../AiravataWorkflowNodeStatusUpdator.java | 112 ------
.../gfac/monitor/ExperimentIdentity.java | 36 --
.../airavata/gfac/monitor/HostMonitorData.java | 69 ----
.../airavata/gfac/monitor/JobIdentity.java | 39 ---
.../apache/airavata/gfac/monitor/MonitorID.java | 238 -------------
.../airavata/gfac/monitor/MonitorManager.java | 347 -------------------
.../airavata/gfac/monitor/TaskIdentity.java | 38 --
.../airavata/gfac/monitor/UserMonitorData.java | 76 ----
.../gfac/monitor/WorkflowNodeIdentity.java | 37 --
.../command/ExperimentCancelRequest.java | 38 --
.../gfac/monitor/command/TaskCancelRequest.java | 52 ---
.../monitor/core/AiravataAbstractMonitor.java | 46 ---
.../gfac/monitor/core/MessageParser.java | 43 ---
.../airavata/gfac/monitor/core/Monitor.java | 30 --
.../airavata/gfac/monitor/core/PullMonitor.java | 64 ----
.../airavata/gfac/monitor/core/PushMonitor.java | 60 ----
.../gfac/monitor/event/MonitorPublisher.java | 47 ---
.../exception/AiravataMonitorException.java | 37 --
.../gfac/monitor/impl/LocalJobMonitor.java | 59 ----
.../monitor/impl/pull/qstat/QstatMonitor.java | 262 --------------
.../impl/pull/qstat/ResourceConnection.java | 151 --------
.../monitor/impl/push/amqp/AMQPMonitor.java | 263 --------------
.../monitor/impl/push/amqp/BasicConsumer.java | 86 -----
.../impl/push/amqp/JSONMessageParser.java | 78 -----
.../impl/push/amqp/UnRegisterWorker.java | 68 ----
.../state/AbstractStateChangeRequest.java | 27 --
.../state/ExperimentStatusChangeRequest.java | 63 ----
.../monitor/state/JobStatusChangeRequest.java | 80 -----
.../gfac/monitor/state/JobStatusInfo.java | 48 ---
.../gfac/monitor/state/PublisherMessage.java | 26 --
.../monitor/state/TaskStatusChangeRequest.java | 61 ----
.../state/WorkflowNodeStatusChangeRequest.java | 63 ----
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 ---
.../monitor/state/impl/GridJobStatusInfo.java | 40 ---
.../gfac/monitor/util/AMQPConnectionUtil.java | 77 ----
.../airavata/gfac/monitor/util/CommonUtils.java | 172 ---------
.../airavata/gfac/monitor/util/X509Helper.java | 161 ---------
.../listeners/GSISSHJobSubmissionListener.java | 55 ---
.../gfac/provider/AbstractProvider.java | 50 +++
.../airavata/gfac/provider/GFacProvider.java | 9 +-
.../gfac/provider/impl/AbstractProvider.java | 92 -----
.../apache/airavata/gfac/utils/GFacUtils.java | 129 ++-----
.../apache/airavata/job/AMQPMonitorTest.java | 175 ----------
.../apache/airavata/job/GFacConfigXmlTest.java | 113 ++++++
.../job/QstatMonitorTestWithMyProxyAuth.java | 167 ---------
.../apache/airavata/job/TestGlobalHandler.java | 32 ++
.../org/apache/airavata/job/TestInHandler.java | 32 ++
.../org/apache/airavata/job/TestOutHandler.java | 32 ++
.../org/apache/airavata/job/TestProvider.java | 47 +++
.../airavata/job/TestThreadedHandler.java | 39 +++
.../src/test/resources/gfac-config.xml | 101 ++----
modules/gfac/gfac-ec2/pom.xml | 4 +-
.../apache/airavata/gfac/ec2/EC2Provider.java | 12 +-
.../gfac-ec2/src/test/resources/gfac-config.xml | 64 ----
.../context/security/GSISecurityContext.java | 13 -
.../gfac/handler/GramDirectorySetupHandler.java | 10 +-
.../gfac/handler/GridFTPInputHandler.java | 6 +-
.../gfac/handler/GridFTPOutputHandler.java | 6 +-
.../gfac/provider/impl/GramProvider.java | 21 +-
.../gfac/util/GramJobSubmissionListener.java | 2 +-
modules/gfac/gfac-gsissh/pom.xml | 7 +-
.../handler/GSISSHDirectorySetupHandler.java | 29 +-
.../gfac/handler/GSISSHInputHandler.java | 47 ++-
.../gfac/handler/GSISSHOutputHandler.java | 19 +-
.../gfac/provider/impl/GSISSHProvider.java | 42 ++-
.../airavata/gfac/util/GFACGSISSHUtils.java | 94 +++++
.../gfac/handler/HDFSDataMovementHandler.java | 3 +-
.../gfac/handler/HadoopDeploymentHandler.java | 4 +-
.../gfac/provider/impl/HadoopProvider.java | 3 +-
.../src/test/resources/gfac-config.xml | 50 ---
.../handler/LocalDirectorySetupHandler.java | 3 +-
.../gfac/provider/impl/LocalProvider.java | 11 +-
.../gfac/services/impl/LocalProviderTest.java | 44 ++-
.../src/test/resources/gfac-config.xml | 63 ----
modules/gfac/gfac-monitor/pom.xml | 181 ++++++++++
.../gfac/monitor/AbstractActivityListener.java | 27 ++
.../AiravataExperimentStatusUpdator.java | 81 +++++
.../gfac/monitor/AiravataJobStatusUpdator.java | 145 ++++++++
.../gfac/monitor/AiravataTaskStatusUpdator.java | 113 ++++++
.../AiravataWorkflowNodeStatusUpdator.java | 112 ++++++
.../gfac/monitor/ExperimentIdentity.java | 36 ++
.../airavata/gfac/monitor/HostMonitorData.java | 69 ++++
.../airavata/gfac/monitor/JobIdentity.java | 39 +++
.../apache/airavata/gfac/monitor/MonitorID.java | 251 ++++++++++++++
.../airavata/gfac/monitor/TaskIdentity.java | 38 ++
.../airavata/gfac/monitor/UserMonitorData.java | 76 ++++
.../gfac/monitor/WorkflowNodeIdentity.java | 37 ++
.../command/ExperimentCancelRequest.java | 38 ++
.../gfac/monitor/command/TaskCancelRequest.java | 52 +++
.../monitor/core/AiravataAbstractMonitor.java | 46 +++
.../gfac/monitor/core/MessageParser.java | 43 +++
.../airavata/gfac/monitor/core/Monitor.java | 30 ++
.../airavata/gfac/monitor/core/PullMonitor.java | 64 ++++
.../airavata/gfac/monitor/core/PushMonitor.java | 60 ++++
.../gfac/monitor/event/MonitorPublisher.java | 47 +++
.../exception/AiravataMonitorException.java | 37 ++
.../handlers/GridPullMonitorHandler.java | 96 +++++
.../gfac/monitor/impl/LocalJobMonitor.java | 59 ++++
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 284 +++++++++++++++
.../impl/pull/qstat/ResourceConnection.java | 151 ++++++++
.../monitor/impl/push/amqp/AMQPMonitor.java | 263 ++++++++++++++
.../monitor/impl/push/amqp/BasicConsumer.java | 86 +++++
.../impl/push/amqp/JSONMessageParser.java | 78 +++++
.../impl/push/amqp/UnRegisterWorker.java | 68 ++++
.../state/AbstractStateChangeRequest.java | 27 ++
.../state/ExperimentStatusChangeRequest.java | 63 ++++
.../monitor/state/JobStatusChangeRequest.java | 80 +++++
.../gfac/monitor/state/JobStatusInfo.java | 48 +++
.../gfac/monitor/state/PublisherMessage.java | 26 ++
.../monitor/state/TaskStatusChangeRequest.java | 61 ++++
.../state/WorkflowNodeStatusChangeRequest.java | 63 ++++
.../monitor/state/impl/AmazonJobStatusInfo.java | 39 +++
.../monitor/state/impl/GridJobStatusInfo.java | 40 +++
.../gfac/monitor/util/AMQPConnectionUtil.java | 77 ++++
.../airavata/gfac/monitor/util/CommonUtils.java | 172 +++++++++
.../airavata/gfac/monitor/util/X509Helper.java | 161 +++++++++
.../src/main/resources/errors.properties | 197 +++++++++++
.../src/main/resources/schema/AccessPolicy.json | 13 +
.../src/main/resources/schema/Activity.json | 31 ++
.../src/main/resources/schema/AdminDomain.json | 51 +++
.../schema/ApplicationEnvironment.json | 86 +++++
.../resources/schema/ApplicationHandle.json | 21 ++
.../src/main/resources/schema/Benchmark.json | 21 ++
.../resources/schema/ComputingActivity.json | 165 +++++++++
.../resources/schema/ComputingEndpoint.json | 44 +++
.../main/resources/schema/ComputingManager.json | 117 +++++++
.../main/resources/schema/ComputingService.json | 32 ++
.../main/resources/schema/ComputingShare.json | 182 ++++++++++
.../src/main/resources/schema/Contact.json | 32 ++
.../src/main/resources/schema/DataStore.json | 30 ++
.../src/main/resources/schema/Domain.json | 30 ++
.../src/main/resources/schema/Endpoint.json | 147 ++++++++
.../src/main/resources/schema/Entity.json | 35 ++
.../resources/schema/ExecutionEnvironment.json | 115 ++++++
.../src/main/resources/schema/Glue2.json | 246 +++++++++++++
.../src/main/resources/schema/Location.json | 47 +++
.../src/main/resources/schema/Manager.json | 28 ++
.../main/resources/schema/MappingPolicy.json | 13 +
.../src/main/resources/schema/Policy.json | 27 ++
.../src/main/resources/schema/Resource.json | 27 ++
.../src/main/resources/schema/Service.json | 75 ++++
.../src/main/resources/schema/Share.json | 45 +++
.../resources/schema/StorageAccessProtocol.json | 32 ++
.../main/resources/schema/StorageEndpoint.json | 8 +
.../main/resources/schema/StorageManager.json | 8 +
.../main/resources/schema/StorageService.json | 22 ++
.../schema/StorageServiceCapacity.json | 33 ++
.../src/main/resources/schema/StorageShare.json | 65 ++++
.../resources/schema/StorageShareCapacity.json | 33 ++
.../resources/schema/ToComputingService.json | 32 ++
.../main/resources/schema/ToStorageService.json | 25 ++
.../src/main/resources/schema/UserDomain.json | 58 ++++
.../src/main/resources/service.properties | 58 ++++
.../apache/airavata/job/AMQPMonitorTest.java | 178 ++++++++++
.../job/QstatMonitorTestWithMyProxyAuth.java | 170 +++++++++
.../src/test/resources/PBSTemplate.xslt | 73 ++++
.../gfac-monitor/src/test/resources/echo.bat | 22 ++
.../src/test/resources/gfac-config.xml | 65 ++++
.../src/test/resources/logging.properties | 42 +++
.../gfac/handler/AdvancedSCPInputHandler.java | 22 +-
.../gfac/handler/AdvancedSCPOutputHandler.java | 54 +--
.../gfac/handler/SSHDirectorySetupHandler.java | 29 +-
.../airavata/gfac/handler/SSHInputHandler.java | 43 +--
.../airavata/gfac/handler/SSHOutputHandler.java | 22 +-
.../gfac/provider/impl/SSHProvider.java | 21 +-
.../apache/airavata/gfac/util/GFACSSHUtils.java | 94 ++++-
modules/gfac/pom.xml | 1 +
.../server/OrchestratorServerHandler.java | 17 -
.../core/context/OrchestratorContext.java | 11 -
.../core/impl/EmbeddedGFACJobSubmitter.java | 3 -
.../cpi/impl/SimpleOrchestratorImpl.java | 169 +--------
.../orchestrator/core/NewOrchestratorTest.java | 2 +
193 files changed, 7501 insertions(+), 4938 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 89bab76..eac1350 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
AiravataUtils.setExecutionAsClient();
final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavata.GetAPIVersion());
-// addDescriptors();
+ addDescriptors();
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createExperimentForSSHHost(airavata);
final String expId = createExperimentForTrestles(airavata);
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/configuration/server/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml
index 1651205..f44fbec 100644
--- a/modules/configuration/server/src/main/resources/gfac-config.xml
+++ b/modules/configuration/server/src/main/resources/gfac-config.xml
@@ -11,6 +11,9 @@
~ under the License. -->
<GFac>
+ <DaemonHandlers>
+ <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"/>
+ </DaemonHandlers>
<GlobalHandlers>
<InHandlers>
<Handler class="org.apache.airavata.gfac.handler.AppDescriptorCheckHandler">
@@ -54,7 +57,7 @@
<Application name="UltraScan">
<InHandlers>
- <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GfacDirectorySetupHandler"/>
<Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
</InHandlers>
<OutHandlers>
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 202db63..32880bc 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -306,6 +306,11 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-gfac-hpc-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>airavata-gfac-local</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index b5faaa5..3435565 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -199,6 +199,7 @@
<include>org.apache.airavata:airavata-gfac-ssh:jar</include>
<include>org.apache.airavata:airavata-gfac-local:jar</include>
<include>org.apache.airavata:airavata-gfac-gsissh:jar</include>
+ <include>org.apache.airavata:airavata-gfac-hpc-monitor:jar</include>
<include>org.apache.airavata:airavata-gfac-hadoop:jar</include>
<include>org.apache.airavata:airavata-gfac-bes:jar</include>
<include>org.apache.airavata:airavata-gfac-gram:jar</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
index 2ace533..4a531a6 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
@@ -48,6 +48,7 @@ import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
import org.apache.airavata.gfac.notification.events.UnicoreJobIDEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.DataTransferrer;
import org.apache.airavata.gfac.utils.JSDLGenerator;
@@ -91,7 +92,7 @@ import eu.unicore.util.httpclient.DefaultClientConfiguration;
-public class BESProvider extends AbstractProvider{
+public class BESProvider extends AbstractProvider {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
private DefaultClientConfiguration secProperties;
@@ -199,7 +200,7 @@ public class BESProvider extends AbstractProvider{
String jobStatusMessage = "Status of job " + jobId + "is " + jobStatus;
jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
details.setJobID(jobId);
- GFacUtils.updateJobStatus(details, jobStatus);
+ GFacUtils.updateJobStatus(jobExecutionContext, details, jobStatus);
} catch (UnknownActivityIdentifierFault e) {
throw new GFacProviderException(e.getMessage(), e.getCause());
}catch (GFacException e) {
@@ -239,7 +240,7 @@ public class BESProvider extends AbstractProvider{
jobExecutionContext.getNotifier().publish(new StatusChangeEvent(jobStatusMessage));
details.setJobID(jobId);
try {
- GFacUtils.saveJobStatus(details, jobStatus, jobExecutionContext.getTaskData().getTaskID());
+ GFacUtils.saveJobStatus(jobExecutionContext,details, jobStatus);
} catch (GFacException e) {
throw new GFacProviderException(e.getLocalizedMessage(),e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index db8d3f5..33758e2 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -112,35 +112,15 @@
<!-- gsi-ssh api dependencies -->
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>gsissh</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
<artifactId>airavata-data-models</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.jcraft</groupId>
- <artifactId>jsch</artifactId>
- <version>0.1.50</version>
- </dependency>
- <dependency>
- <groupId>org.ogce</groupId>
- <artifactId>bcgss</artifactId>
- <version>146</version>
- </dependency>
- <dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
<version>${xmlbeans.version}</version>
</dependency>
<!-- this is the dependency for amqp implementation -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.0.0</version>
- </dependency>
</dependencies>
<build>
@@ -154,22 +134,6 @@
<failIfNoTests>false</failIfNoTests>
</configuration>
</plugin>
- <plugin>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-maven-plugin</artifactId>
- <version>0.4.0</version>
- <configuration>
- <sourceDirectory>${basedir}/src/main/resources/schema</sourceDirectory>
- <targetPackage>org.apache.airavata</targetPackage>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
index bda40b1..b9ecdbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac;
public class Constants {
public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler";
public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler";
+ public static final String XPATH_EXPR_DAEMON_HANDLERS = "/GFac/DaemonHandlers/Handler";
public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='";
public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler";
@@ -37,6 +38,7 @@ public class Constants {
public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler";
public static final String GFAC_CONFIG_CLASS_ATTRIBUTE = "class";
+ public static final String GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE = "executionMode";
public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class";
public static final String NEWLINE = System.getProperty("line.separator");
public static final String INPUT_DATA_DIR_VAR_NAME = "input";
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/ExecutionMode.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/ExecutionMode.java
new file mode 100644
index 0000000..63f2523
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/ExecutionMode.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac;
+
+
+/**
+ * These are the different modes of execution chains in gfac
+ * if the mode is SYNCHRONOUS then gfac will waits until the provider invoke mthod returns and then
+ * invoke the out handlers explicitly, otherwise gfac will not invoke out hanlders, implementation
+ * has to handler when to invoke out handlers, and default execution mode is synchronous.
+ */
+public enum ExecutionMode {
+ SYNCHRONOUS,ASYNCHRONOUS;
+
+ public static ExecutionMode fromString(String mode){
+ if("async".equals(mode) || "asynchronous".equals(mode)){
+ return ExecutionMode.ASYNCHRONOUS;
+ }
+ return ExecutionMode.SYNCHRONOUS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
index 3ae29d6..cd716e5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.exception.UnspecifiedApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.provider.GFacProviderConfig;
import org.slf4j.Logger;
@@ -60,6 +61,7 @@ public class GFacConfiguration {
// the provider
private List<GFacHandlerConfig> outHandlers = new ArrayList<GFacHandlerConfig>();
+ public ExecutionMode executionMode = ExecutionMode.SYNCHRONOUS; // default execution mode is SYNCHRONOUS
public GFacConfiguration(AiravataAPI airavataAPI) {
this.airavataAPI = airavataAPI;
@@ -232,13 +234,13 @@ public class GFacConfiguration {
return gFacProviderConfigs;
}
- public static String getProviderClassName(Document doc, String expression, String attribute) throws XPathExpressionException {
+ public static String getAttributeValue(Document doc, String expression, String attribute) throws XPathExpressionException {
XPathFactory xPathFactory = XPathFactory.newInstance();
XPath xPath = xPathFactory.newXPath();
XPathExpression expr = xPath.compile(expression);
NodeList nl = (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
- String className = "";
+ String className = null;
for (int i = 0; i < nl.getLength(); i++) {
className = ((Element) nl.item(i)).getAttribute(attribute);
break;
@@ -262,8 +264,21 @@ public class GFacConfiguration {
arlList.addAll(newList);
return arlList;
}
-
+ public static List<GFacHandlerConfig> getDaemonHandlers(File configFile)throws ParserConfigurationException, IOException, SAXException, XPathExpressionException{
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+ handlerDoc = docBuilder.parse(configFile);
+ return getHandlerConfig(handlerDoc, Constants.XPATH_EXPR_DAEMON_HANDLERS, Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ }
public static Document getHandlerDoc() {
return handlerDoc;
}
+
+ public ExecutionMode getExecutionMode() {
+ return executionMode;
+ }
+
+ public void setExecutionMode(ExecutionMode executionMode) {
+ this.executionMode = executionMode;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
index 1b30240..273013b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
@@ -64,6 +64,11 @@ public class Scheduler {
// Current implementation only support static handler sequence.
jobExecutionContext.setProvider(getProvider(jobExecutionContext));
// TODO: Selecting the provider based on application description.
+ jobExecutionContext.getGFacConfiguration().setInHandlers(jobExecutionContext.getProvider().getClass().getName(),
+ jobExecutionContext.getServiceName());
+ jobExecutionContext.getGFacConfiguration().setOutHandlers(jobExecutionContext.getProvider().getClass().getName(),
+ jobExecutionContext.getServiceName());
+ jobExecutionContext.getGFacConfiguration().setExecutionMode(getExecutionMode(jobExecutionContext));
}
/**
@@ -109,7 +114,7 @@ public class Scheduler {
// We give higher preference to applications specific provider if configured
if (provider == null) {
String hostClass = hostDescription.getType().getClass().getName();
- providerClassName = GFacConfiguration.getProviderClassName(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
+ providerClassName = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_CLASS_ATTRIBUTE);
Class<? extends GFacProvider> aClass1 = Class.forName(providerClassName).asSubclass(GFacProvider.class);
provider = aClass1.newInstance();
//loading the provider properties
@@ -137,7 +142,42 @@ public class Scheduler {
}
return provider;
}
+ public static ExecutionMode getExecutionMode(JobExecutionContext jobExecutionContext)throws GFacException{
+ HostDescription hostDescription = jobExecutionContext.getApplicationContext().getHostDescription();
+ String applicationName = jobExecutionContext.getServiceName();
+ URL resource = Scheduler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder = null;
+ Document handlerDoc = null;
+ try {
+ docBuilder = docBuilderFactory.newDocumentBuilder();
+ handlerDoc = docBuilder.parse(new File(resource.getPath()));
+ } catch (ParserConfigurationException e) {
+ throw new GFacException(e);
+ } catch (SAXException e) {
+ throw new GFacException(e);
+ } catch (IOException e) {
+ throw new GFacException(e);
+ }
+ GFacProviderConfig s = null;
+ String executionMode = "sync";
+ try {
+ executionMode = GFacConfiguration.getAttributeValue(handlerDoc,
+ Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
+ // This should be have a single element only.
+
+ if (executionMode == null || "".equals(executionMode)) {
+ String hostClass = hostDescription.getType().getClass().getName();
+ executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE);
+ }
+ } catch (XPathExpressionException e) {
+ log.error("Error evaluating XPath expression"); //To change body of catch statement use File | Settings | File Templates.
+ throw new GFacException("Error evaluating XPath expression", e);
+ }
+
+ return ExecutionMode.fromString(executionMode);
+ }
public static HostDescription pickaHost(AiravataAPI api, String serviceName) throws AiravataAPIInvocationException {
List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
Map<String, ApplicationDescription> applicationDescriptors = api.getApplicationManager().getApplicationDescriptors(serviceName);
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
index 1f99057..2de8f48 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
@@ -26,13 +26,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.notification.GFacNotifier;
import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.registry.cpi.Registry;
public class JobExecutionContext extends AbstractContext{
@@ -46,11 +49,15 @@ public class JobExecutionContext extends AbstractContext{
private MessageContext outMessageContext;
private GFacNotifier notifier;
-
+
+ private Experiment experiment;
+
private TaskDetails taskData;
private JobDetails jobDetails;
+ private WorkflowNodeDetails workflowNodeDetails;
+
// private ContextHeaderDocument.ContextHeader contextHeader;
// Keep track of the current path of the message. Before hitting provider its in-path.
@@ -83,6 +90,7 @@ public class JobExecutionContext extends AbstractContext{
private List<String> outputFileList;
private Registry registry;
+
/**
* Security context is used to handle authentication for input handlers and providers.
* There can be multiple security requirement for a single job so this allows you to add multiple security types
@@ -97,6 +105,7 @@ public class JobExecutionContext extends AbstractContext{
outputFileList = new ArrayList<String>();
}
+
public String getExperimentID() {
return experimentID;
}
@@ -142,10 +151,6 @@ public class JobExecutionContext extends AbstractContext{
}
public void setProvider(GFacProvider provider) {
- this.gfacConfiguration.setInHandlers(provider.getClass().getName(),
- this.getServiceName());
- this.gfacConfiguration.setOutHandlers(provider.getClass().getName(),
- this.getServiceName());
this.provider = provider;
}
@@ -253,4 +258,20 @@ public class JobExecutionContext extends AbstractContext{
public void setRegistry(Registry registry) {
this.registry = registry;
}
+
+ public Experiment getExperiment() {
+ return experiment;
+ }
+
+ public void setExperiment(Experiment experiment) {
+ this.experiment = experiment;
+ }
+
+ public WorkflowNodeDetails getWorkflowNodeDetails() {
+ return workflowNodeDetails;
+ }
+
+ public void setWorkflowNodeDetails(WorkflowNodeDetails workflowNodeDetails) {
+ this.workflowNodeDetails = workflowNodeDetails;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index c09f45a..642d659 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.cpi;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -42,39 +43,35 @@ import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.handler.GFacHandler;
import org.apache.airavata.gfac.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.handler.ThreadedHandler;
import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.notification.listeners.WorkflowTrackingListener;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.scheduler.HostScheduler;
import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.monitor.AbstractActivityListener;
-import org.apache.airavata.gfac.monitor.MonitorManager;
-import org.apache.airavata.gfac.monitor.command.ExperimentCancelRequest;
-import org.apache.airavata.gfac.monitor.command.TaskCancelRequest;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.persistance.registry.jpa.resources.AbstractResource.TaskDetailConstants;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
/**
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
* the resource, required data for the job has to be stored in registry prior to invoke this object.
*/
-public class GFacImpl implements GFac, AbstractActivityListener {
+public class GFacImpl implements GFac {
private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
public static final String ERROR_SENT = "ErrorSent";
- public static final String PBS_JOB_MANAGER = "pbs";
- public static final String SLURM_JOB_MANAGER = "slurm";
- public static final String SUN_GRID_ENGINE_JOB_MANAGER = "sge";
private Registry registry;
@@ -82,8 +79,9 @@ public class GFacImpl implements GFac, AbstractActivityListener {
private AiravataRegistry2 airavataRegistry2;
- private MonitorManager monitorManager;
+ private static List<ThreadedHandler> daemonHandlers;
+ private File gfacConfigFile;
/**
* Constructor for GFac
*
@@ -95,13 +93,59 @@ public class GFacImpl implements GFac, AbstractActivityListener {
this.registry = registry;
this.airavataAPI = airavataAPI;
this.airavataRegistry2 = airavataRegistry2;
+ daemonHandlers = new ArrayList<ThreadedHandler>();
+ startDaemonHandlers();
+ }
+
+ private void startDaemonHandlers() {
+ List<GFacHandlerConfig> daemonHandlerConfig = null;
+ URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ gfacConfigFile = new File(resource.getPath());
+ try {
+ daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+ } catch (ParserConfigurationException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+ } catch (IOException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ } catch (SAXException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ } catch (XPathExpressionException e) {
+ log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+ }
+
+ for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+ String className = handlerConfig.getClassName();
+ try {
+ Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+ ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+ threadedHandler.initProperties(handlerConfig.getProperties());
+ daemonHandlers.add(threadedHandler);
+ }catch (ClassNotFoundException e){
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (InstantiationException e) {
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (IllegalAccessException e) {
+ log.error("Error initializing the handler: " + className);
+ log.error(className + " class has to implement " + ThreadedHandler.class);
+ } catch (GFacHandlerException e) {
+ log.error("Error initializing the handler " + className);
+ } catch (GFacException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ for(ThreadedHandler tHandler:daemonHandlers){
+ tHandler.run();
+ }
}
/**
* This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
*/
public GFacImpl() {
-
+ daemonHandlers = new ArrayList<ThreadedHandler>();
+ startDaemonHandlers();
}
/**
@@ -127,6 +171,7 @@ public class GFacImpl implements GFac, AbstractActivityListener {
private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
JobExecutionContext jobExecutionContext;
TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID);
+
// this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
// has to be same.
@@ -146,22 +191,32 @@ public class GFacImpl implements GFac, AbstractActivityListener {
HostDescription hostDescription = hostScheduler.schedule(registeredHosts);
ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
- String hostName;
+ String hostName;
if(taskData.getTaskScheduling().getResourceHostId() != null){
- hostName = taskData.getTaskScheduling().getResourceHostId();
+ hostName = taskData.getTaskScheduling().getResourceHostId();
}else{
- hostName = hostDescription.getType().getHostName();
+ hostName = hostDescription.getType().getHostName();
}
-
- ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
+
+ ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
Properties configurationProperties = ServerSettings.getProperties();
GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+ // start constructing jobexecutioncontext
jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
- jobExecutionContext.setRegistry(registry);
+ Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentID);
+ jobExecutionContext.setExperiment(experiment);
+ jobExecutionContext.setExperimentID(experimentID);
+
jobExecutionContext.setTaskData(taskData);
+
+
+
+ jobExecutionContext.setRegistry(registry);
+
ApplicationContext applicationContext = new ApplicationContext();
applicationContext.setApplicationDeploymentDescription(applicationDescription);
applicationContext.setHostDescription(hostDescription);
@@ -177,7 +232,6 @@ public class GFacImpl implements GFac, AbstractActivityListener {
serviceDescription.getType().getOutputParametersArray())));
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
- jobExecutionContext.setExperimentID(experimentID);
return jobExecutionContext;
}
@@ -218,6 +272,9 @@ public class GFacImpl implements GFac, AbstractActivityListener {
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
}
+ if(GFacUtils.isSynchronousMode(jobExecutionContext)){
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
} catch (Exception e) {
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
@@ -323,29 +380,25 @@ public class GFacImpl implements GFac, AbstractActivityListener {
}
+ public AiravataAPI getAiravataAPI() {
+ return airavataAPI;
+ }
+
+ public AiravataRegistry2 getAiravataRegistry2() {
+ return airavataRegistry2;
+ }
+
+ public static List<ThreadedHandler> getDaemonHandlers() {
+ return daemonHandlers;
+ }
+
+ public static String getErrorSent() {
+ return ERROR_SENT;
+ }
+
+ public File getGfacConfigFile() {
+ return gfacConfigFile;
+ }
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof MonitorManager){
- monitorManager=(MonitorManager) configuration;
- }
- }
-
- }
-
- @Subscribe
- public void experimentCancelRequested(ExperimentCancelRequest request){
- try {
- List<String> nodeIds = registry.getIds(DataType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, request.getExperimentId());
- for (String nodeId : nodeIds) {
- List<String> taskIds = registry.getIds(DataType.TASK_DETAIL, TaskDetailConstants.NODE_INSTANCE_ID, nodeId);
- for (String taskId : taskIds) {
- monitorManager.getMonitorPublisher().publish(new TaskCancelRequest(request.getExperimentId(),nodeId, taskId));
- }
- }
- } catch (RegistryException e) {
- log.error("Error while attempting to publish task cancel requests!!!",e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
index 2468273..3524cdb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AbstractHandler.java
@@ -20,7 +20,6 @@
*/
package org.apache.airavata.gfac.handler;
-import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
@@ -28,7 +27,7 @@ import org.apache.airavata.registry.cpi.Registry;
public abstract class AbstractHandler implements GFacHandler {
protected Registry registry = null;
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException, GFacException {
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
registry = jobExecutionContext.getRegistry();
if(registry == null){
registry = RegistryFactory.getDefaultRegistry();
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AppDescriptorCheckHandler.java
index 9afa92f..7cc5a47 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/AppDescriptorCheckHandler.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.handler;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.gfac.Constants;
-import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.slf4j.Logger;
@@ -82,7 +81,7 @@ public class AppDescriptorCheckHandler implements GFacHandler {
jobExecutionContext.getApplicationContext().setApplicationDeploymentDescription(app);
}
- public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException {
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandler.java
index 3975031..8f86a48 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandler.java
@@ -28,7 +28,7 @@ import java.util.Map;
public interface GFacHandler {
- public void initProperties(Map<String, String> properties) throws GFacHandlerException,GFacException;
+ public void initProperties(Map<String, String> properties) throws GFacHandlerException;
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,GFacException;
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandlerException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandlerException.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandlerException.java
index ad86479..775e515 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandlerException.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/GFacHandlerException.java
@@ -43,4 +43,8 @@ public class GFacHandlerException extends GFacException {
super(message, e);
log.error(message,e);
}
+ public GFacHandlerException(Exception e) {
+ super(e);
+ log.error(e.getMessage(),e);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/ThreadedHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/ThreadedHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/ThreadedHandler.java
new file mode 100644
index 0000000..fadf120
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/handler/ThreadedHandler.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+/**
+ * This handler can be used to run in a separate thread in handler chain or as a daemon
+ * for the gfac instance you are running. If this is configured as another
+ * handler in the execution chain this will get invoked in asynchronous mode in a separate
+ * thread for each request, so you have to make sure the thread is returning properly.
+ */
+public abstract class ThreadedHandler extends AbstractHandler implements Runnable{
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
deleted file mode 100644
index 63f89df..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor;
-
-
-public interface AbstractActivityListener {
- public void setup(Object... configurations);
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
deleted file mode 100644
index a70b14f..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-
-public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
- private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
-
- private Registry airavataRegistry;
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- @Subscribe
- public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
- ExperimentState state = experimentStatus.getState();
- if (state != null) {
- try {
- String experimentID = experimentStatus.getIdentity().getExperimentID();
- updateExperimentStatus(experimentID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
- }
-
- public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
- Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
- if(details == null) {
- details = new Experiment();
- details.setExperimentID(experimentId);
- }
- org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
- status.setExperimentState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setExperimentStatus(status);
- airavataRegistry.update(DataType.EXPERIMENT, details, experimentId);
- }
-
- @Override
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof Registry){
- this.airavataRegistry=(Registry)configuration;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
deleted file mode 100644
index 99c8733..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.registry.cpi.CompositeIdentifier;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-import java.util.concurrent.BlockingQueue;
-
-public class AiravataJobStatusUpdator implements AbstractActivityListener {
- private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
-
- private Registry airavataRegistry;
-
- private MonitorPublisher monitorPublisher;
-
- private BlockingQueue<MonitorID> jobsToMonitor;
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- public BlockingQueue<MonitorID> getJobsToMonitor() {
- return jobsToMonitor;
- }
-
- public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
- this.jobsToMonitor = jobsToMonitor;
- }
-
- @Subscribe
- public void updateRegistry(JobStatusChangeRequest jobStatus) {
- /* Here we need to parse the jobStatus message and update
- the registry accordingly, for now we are just printing to standard Out
- */
- JobState state = jobStatus.getState();
- if (state != null) {
- try {
- String taskID = jobStatus.getIdentity().getTaskId();
- String jobID = jobStatus.getIdentity().getJobId();
- updateJobStatus(taskID, jobID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
- switch (state) {
- case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- default:
- break;
- }
- }
- }
-
- @Subscribe
- public void setupTaskStatus(JobStatusChangeRequest jobStatus){
- TaskState state=TaskState.UNKNOWN;
- switch(jobStatus.getState()){
- case ACTIVE:
- state=TaskState.EXECUTING; break;
- case CANCELED:
- state=TaskState.CANCELED; break;
- case COMPLETE:
- state=TaskState.COMPLETED; break;
- case FAILED:
- state=TaskState.FAILED; break;
- case HELD: case SUSPENDED: case QUEUED:
- state=TaskState.WAITING; break;
- case SETUP:
- state=TaskState.PRE_PROCESSING; break;
- case SUBMITTED:
- state=TaskState.STARTED; break;
- case UN_SUBMITTED:
- state=TaskState.CANCELED; break;
- case CANCELING:
- state=TaskState.CANCELING; break;
- default:
- break;
- }
- logger.debug("Publishing Task Status "+state.toString());
- monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
- }
-
- public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
- CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
- JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
- if(details == null) {
- details = new JobDetails();
- }
- org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
- status.setJobState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setJobStatus(status);
- details.setJobID(jobID);
- airavataRegistry.update(DataType.JOB_DETAIL, details, ids);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof Registry){
- this.airavataRegistry=(Registry)configuration;
- } else if (configuration instanceof BlockingQueue<?>){
- this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
deleted file mode 100644
index e8dd7a0..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-
-public class AiravataTaskStatusUpdator implements AbstractActivityListener {
- private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
-
- private Registry airavataRegistry;
-
- private MonitorPublisher monitorPublisher;
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- @Subscribe
- public void updateRegistry(TaskStatusChangeRequest taskStatus) {
- TaskState state = taskStatus.getState();
- if (state != null) {
- try {
- String taskID = taskStatus.getIdentity().getTaskId();
- updateTaskStatus(taskID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
- }
-
- @Subscribe
- public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
- WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
- switch(taskStatus.getState()){
- case CANCELED:
- state=WorkflowNodeState.CANCELED; break;
- case COMPLETED:
- state=WorkflowNodeState.COMPLETED; break;
- case CONFIGURING_WORKSPACE:
- state=WorkflowNodeState.INVOKED; break;
- case FAILED:
- state=WorkflowNodeState.FAILED; break;
- case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
- state=WorkflowNodeState.EXECUTING; break;
- case STARTED:
- state=WorkflowNodeState.INVOKED; break;
- case CANCELING:
- state=WorkflowNodeState.CANCELING; break;
- default:
- break;
- }
- logger.debug("Publishing Experiment Status "+state.toString());
- monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
- }
-
- public void updateTaskStatus(String taskId, TaskState state) throws Exception {
- TaskDetails details = (TaskDetails)airavataRegistry.get(DataType.TASK_DETAIL, taskId);
- if(details == null) {
- details = new TaskDetails();
- details.setTaskID(taskId);
- }
- org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
- status.setExecutionState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setTaskStatus(status);
- airavataRegistry.update(DataType.TASK_DETAIL, details, taskId);
- }
-
- @Override
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof Registry){
- this.airavataRegistry=(Registry)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
deleted file mode 100644
index 2375d72..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
-import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
-import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
-import org.apache.airavata.registry.cpi.DataType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-
-public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
- private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
-
- private Registry airavataRegistry;
-
- private MonitorPublisher monitorPublisher;
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- @Subscribe
- public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
- WorkflowNodeState state = workflowNodeStatus.getState();
- if (state != null) {
- try {
- String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
- updateWorkflowNodeStatus(workflowNodeID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
- }
-
- @Subscribe
- public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
- ExperimentState state=ExperimentState.UNKNOWN;
- switch(nodeStatus.getState()){
- case CANCELED:
- state=ExperimentState.CANCELED; break;
- case COMPLETED:
- state=ExperimentState.COMPLETED; break;
- case INVOKED:
- state=ExperimentState.LAUNCHED; break;
- case FAILED:
- state=ExperimentState.FAILED; break;
- case EXECUTING:
- state=ExperimentState.EXECUTING; break;
- case CANCELING:
- state=ExperimentState.CANCELING; break;
- default:
- break;
- }
- logger.debug("Publishing Experiment Status "+state.toString());
- monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
- }
-
- public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
- WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
- if(details == null) {
- details = new WorkflowNodeDetails();
- details.setNodeInstanceId(workflowNodeId);
- }
- WorkflowNodeStatus status = new WorkflowNodeStatus();
- status.setWorkflowNodeState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setWorkflowNodeStatus(status);
- airavataRegistry.update(DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
- }
-
- @Override
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof Registry){
- this.airavataRegistry=(Registry)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
deleted file mode 100644
index ba8efeb..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor;
-
-public class ExperimentIdentity {
- private String experimentID;
- public ExperimentIdentity(String experimentId) {
- setExperimentID(experimentId);
- }
- public String getExperimentID() {
- return experimentID;
- }
-
- public void setExperimentID(String experimentID) {
- this.experimentID = experimentID;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
deleted file mode 100644
index e57087d..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor;
-
-import org.apache.airavata.commons.gfac.type.HostDescription;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class HostMonitorData {
- private HostDescription host;
-
- private List<MonitorID> monitorIDs;
-
- public HostMonitorData(HostDescription host) {
- this.host = host;
- monitorIDs = new ArrayList<MonitorID>();
- }
-
- public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
- this.host = host;
- this.monitorIDs = monitorIDs;
- }
-
- public HostDescription getHost() {
- return host;
- }
-
- public void setHost(HostDescription host) {
- this.host = host;
- }
-
- public List<MonitorID> getMonitorIDs() {
- return monitorIDs;
- }
-
- public void setMonitorIDs(List<MonitorID> monitorIDs) {
- this.monitorIDs = monitorIDs;
- }
-
- /**
- * this method get called by CommonUtils and it will check the right place before adding
- * so there will not be a mismatch between this.host and monitorID.host
- * @param monitorID
- * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
- */
- public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
- monitorIDs.add(monitorID);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
deleted file mode 100644
index 84c4a55..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.monitor;
-
-public class JobIdentity extends TaskIdentity {
- private String jobId;
-
- public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
- super(experimentId,workflowNodeId,taskId);
- setJobId(jobId);
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-}