You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:29 UTC
[02/15] airavata git commit: implementing passive gfac submitter
using rabbbitmq
implementing passive gfac submitter using rabbbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/88d27d95
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/88d27d95
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/88d27d95
Branch: refs/heads/master
Commit: 88d27d9574f9b077d334eabde147a0787b186899
Parents: 30aefc4
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Feb 10 14:04:13 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Feb 10 14:04:13 2015 -0500
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 2 +-
.../lib/airavata/messagingEvents_types.cpp | 186 +++++-
.../lib/airavata/messagingEvents_types.h | 99 +++-
.../Airavata/Model/Messaging/Event/Types.php | 208 +++++++
.../model/messaging/event/MessageType.java | 8 +-
.../model/messaging/event/TaskSubmitEvent.java | 588 +++++++++++++++++++
.../messaging/event/TaskTerminateEvent.java | 492 ++++++++++++++++
airavata-api/generate-thrift-files.sh | 22 +-
.../messagingEvents.thrift | 15 +-
.../airavata/common/utils/ServerSettings.java | 5 +
.../main/resources/airavata-server.properties | 4 +-
.../main/resources/airavata-server.properties | 4 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 2 +-
.../messaging/core/PublisherFactory.java | 21 +-
.../messaging/core/impl/RabbitMQPublisher.java | 99 ----
.../core/impl/RabbitMQStatusPublisher.java | 99 ++++
.../core/impl/RabbitMQTaskLaunchPublisher.java | 88 +++
.../server/OrchestratorServerHandler.java | 3 +-
modules/orchestrator/orchestrator-core/pom.xml | 14 +-
.../core/context/OrchestratorContext.java | 11 +
.../core/impl/GFACPassiveJobSubmitter.java | 247 ++++++++
.../core/impl/GFACRPCJobSubmitter.java | 212 +++++++
.../core/impl/GFACServiceJobSubmitter.java | 212 -------
.../workflow/engine/WorkflowEngineImpl.java | 4 +-
pom.xml | 7 +
25 files changed, 2314 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 8e2ca17..8d1cd75 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -118,7 +118,7 @@ public class AiravataServerHandler implements Airavata.Iface {
public AiravataServerHandler() {
try {
if (ServerSettings.isRabbitMqPublishEnabled()) {
- publisher = PublisherFactory.createPublisher();
+ publisher = PublisherFactory.createActivityPublisher();
}
} catch (ApplicationSettingsException e) {
logger.error("Error occured while reading airavata-server properties..", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index 1f839f3..a2e72f5 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -45,15 +45,19 @@ int _kMessageTypeValues[] = {
MessageType::EXPERIMENT,
MessageType::TASK,
MessageType::WORKFLOWNODE,
- MessageType::JOB
+ MessageType::JOB,
+ MessageType::LAUNCHTASK,
+ MessageType::TERMINATETASK
};
const char* _kMessageTypeNames[] = {
"EXPERIMENT",
"TASK",
"WORKFLOWNODE",
- "JOB"
+ "JOB",
+ "LAUNCHTASK",
+ "TERMINATETASK"
};
-const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
const char* ExperimentStatusChangeEvent::ascii_fingerprint = "38C252E94E93B69D04EB3A6EE2F9EDFB";
const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x38,0xC2,0x52,0xE9,0x4E,0x93,0xB6,0x9D,0x04,0xEB,0x3A,0x6E,0xE2,0xF9,0xED,0xFB};
@@ -835,6 +839,182 @@ void swap(JobIdentifier &a, JobIdentifier &b) {
swap(a.gatewayId, b.gatewayId);
}
+const char* TaskSubmitEvent::ascii_fingerprint = "AB879940BD15B6B25691265F7384B271";
+const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+
+uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_experimentId = false;
+ bool isset_taskId = false;
+ bool isset_gatewayId = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->experimentId);
+ isset_experimentId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->taskId);
+ isset_taskId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->gatewayId);
+ isset_gatewayId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_experimentId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_taskId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_gatewayId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t TaskSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("TaskSubmitEvent");
+
+ xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->experimentId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->taskId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("gatewayId", ::apache::thrift::protocol::T_STRING, 3);
+ xfer += oprot->writeString(this->gatewayId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(TaskSubmitEvent &a, TaskSubmitEvent &b) {
+ using ::std::swap;
+ swap(a.experimentId, b.experimentId);
+ swap(a.taskId, b.taskId);
+ swap(a.gatewayId, b.gatewayId);
+}
+
+const char* TaskTerminateEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972";
+const uint8_t TaskTerminateEvent::binary_fingerprint[16] = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+uint32_t TaskTerminateEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_experimentId = false;
+ bool isset_taskId = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->experimentId);
+ isset_experimentId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->taskId);
+ isset_taskId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_experimentId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_taskId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t TaskTerminateEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("TaskTerminateEvent");
+
+ xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->experimentId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->taskId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(TaskTerminateEvent &a, TaskTerminateEvent &b) {
+ using ::std::swap;
+ swap(a.experimentId, b.experimentId);
+ swap(a.taskId, b.taskId);
+}
+
const char* JobStatusChangeEvent::ascii_fingerprint = "062775D589B60D1687103FD465B0F5E8";
const uint8_t JobStatusChangeEvent::binary_fingerprint[16] = {0x06,0x27,0x75,0xD5,0x89,0xB6,0x0D,0x16,0x87,0x10,0x3F,0xD4,0x65,0xB0,0xF5,0xE8};
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index 572a8bd..f063fc2 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -52,7 +52,9 @@ struct MessageType {
EXPERIMENT = 0,
TASK = 1,
WORKFLOWNODE = 2,
- JOB = 3
+ JOB = 3,
+ LAUNCHTASK = 4,
+ TERMINATETASK = 5
};
};
@@ -460,6 +462,101 @@ class JobIdentifier {
void swap(JobIdentifier &a, JobIdentifier &b);
+class TaskSubmitEvent {
+ public:
+
+ static const char* ascii_fingerprint; // = "AB879940BD15B6B25691265F7384B271";
+ static const uint8_t binary_fingerprint[16]; // = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+
+ TaskSubmitEvent() : experimentId(), taskId(), gatewayId() {
+ }
+
+ virtual ~TaskSubmitEvent() throw() {}
+
+ std::string experimentId;
+ std::string taskId;
+ std::string gatewayId;
+
+ void __set_experimentId(const std::string& val) {
+ experimentId = val;
+ }
+
+ void __set_taskId(const std::string& val) {
+ taskId = val;
+ }
+
+ void __set_gatewayId(const std::string& val) {
+ gatewayId = val;
+ }
+
+ bool operator == (const TaskSubmitEvent & rhs) const
+ {
+ if (!(experimentId == rhs.experimentId))
+ return false;
+ if (!(taskId == rhs.taskId))
+ return false;
+ if (!(gatewayId == rhs.gatewayId))
+ return false;
+ return true;
+ }
+ bool operator != (const TaskSubmitEvent &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const TaskSubmitEvent & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(TaskSubmitEvent &a, TaskSubmitEvent &b);
+
+
+class TaskTerminateEvent {
+ public:
+
+ static const char* ascii_fingerprint; // = "07A9615F837F7D0A952B595DD3020972";
+ static const uint8_t binary_fingerprint[16]; // = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+ TaskTerminateEvent() : experimentId(), taskId() {
+ }
+
+ virtual ~TaskTerminateEvent() throw() {}
+
+ std::string experimentId;
+ std::string taskId;
+
+ void __set_experimentId(const std::string& val) {
+ experimentId = val;
+ }
+
+ void __set_taskId(const std::string& val) {
+ taskId = val;
+ }
+
+ bool operator == (const TaskTerminateEvent & rhs) const
+ {
+ if (!(experimentId == rhs.experimentId))
+ return false;
+ if (!(taskId == rhs.taskId))
+ return false;
+ return true;
+ }
+ bool operator != (const TaskTerminateEvent &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const TaskTerminateEvent & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(TaskTerminateEvent &a, TaskTerminateEvent &b);
+
+
class JobStatusChangeEvent {
public:
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index d20392a..40810d3 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -35,11 +35,15 @@ final class MessageType {
const TASK = 1;
const WORKFLOWNODE = 2;
const JOB = 3;
+ const LAUNCHTASK = 4;
+ const TERMINATETASK = 5;
static public $__names = array(
0 => 'EXPERIMENT',
1 => 'TASK',
2 => 'WORKFLOWNODE',
3 => 'JOB',
+ 4 => 'LAUNCHTASK',
+ 5 => 'TERMINATETASK',
);
}
@@ -967,6 +971,210 @@ class JobIdentifier {
}
+class TaskSubmitEvent {
+ static $_TSPEC;
+
+ public $experimentId = null;
+ public $taskId = null;
+ public $gatewayId = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'experimentId',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'taskId',
+ 'type' => TType::STRING,
+ ),
+ 3 => array(
+ 'var' => 'gatewayId',
+ 'type' => TType::STRING,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['experimentId'])) {
+ $this->experimentId = $vals['experimentId'];
+ }
+ if (isset($vals['taskId'])) {
+ $this->taskId = $vals['taskId'];
+ }
+ if (isset($vals['gatewayId'])) {
+ $this->gatewayId = $vals['gatewayId'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'TaskSubmitEvent';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->experimentId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->taskId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 3:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->gatewayId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('TaskSubmitEvent');
+ if ($this->experimentId !== null) {
+ $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1);
+ $xfer += $output->writeString($this->experimentId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->taskId !== null) {
+ $xfer += $output->writeFieldBegin('taskId', TType::STRING, 2);
+ $xfer += $output->writeString($this->taskId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->gatewayId !== null) {
+ $xfer += $output->writeFieldBegin('gatewayId', TType::STRING, 3);
+ $xfer += $output->writeString($this->gatewayId);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+class TaskTerminateEvent {
+ static $_TSPEC;
+
+ public $experimentId = null;
+ public $taskId = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'experimentId',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'taskId',
+ 'type' => TType::STRING,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['experimentId'])) {
+ $this->experimentId = $vals['experimentId'];
+ }
+ if (isset($vals['taskId'])) {
+ $this->taskId = $vals['taskId'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'TaskTerminateEvent';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->experimentId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->taskId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('TaskTerminateEvent');
+ if ($this->experimentId !== null) {
+ $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1);
+ $xfer += $output->writeString($this->experimentId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->taskId !== null) {
+ $xfer += $output->writeFieldBegin('taskId', TType::STRING, 2);
+ $xfer += $output->writeString($this->taskId);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
class JobStatusChangeEvent {
static $_TSPEC;
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index d00f404..230b87b 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -32,7 +32,9 @@ import org.apache.thrift.TEnum;
EXPERIMENT(0),
TASK(1),
WORKFLOWNODE(2),
- JOB(3);
+ JOB(3),
+ LAUNCHTASK(4),
+ TERMINATETASK(5);
private final int value;
@@ -61,6 +63,10 @@ import org.apache.thrift.TEnum;
return WORKFLOWNODE;
case 3:
return JOB;
+ case 4:
+ return LAUNCHTASK;
+ case 5:
+ return TERMINATETASK;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
new file mode 100644
index 0000000..c813c76
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class TaskSubmitEvent implements org.apache.thrift.TBase<TaskSubmitEvent, TaskSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskSubmitEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskSubmitEvent");
+
+ private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new TaskSubmitEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new TaskSubmitEventTupleSchemeFactory());
+ }
+
+ private String experimentId; // required
+ private String taskId; // required
+ private String gatewayId; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXPERIMENT_ID((short)1, "experimentId"),
+ TASK_ID((short)2, "taskId"),
+ GATEWAY_ID((short)3, "gatewayId");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EXPERIMENT_ID
+ return EXPERIMENT_ID;
+ case 2: // TASK_ID
+ return TASK_ID;
+ case 3: // GATEWAY_ID
+ return GATEWAY_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskSubmitEvent.class, metaDataMap);
+ }
+
+ public TaskSubmitEvent() {
+ }
+
+ public TaskSubmitEvent(
+ String experimentId,
+ String taskId,
+ String gatewayId)
+ {
+ this();
+ this.experimentId = experimentId;
+ this.taskId = taskId;
+ this.gatewayId = gatewayId;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public TaskSubmitEvent(TaskSubmitEvent other) {
+ if (other.isSetExperimentId()) {
+ this.experimentId = other.experimentId;
+ }
+ if (other.isSetTaskId()) {
+ this.taskId = other.taskId;
+ }
+ if (other.isSetGatewayId()) {
+ this.gatewayId = other.gatewayId;
+ }
+ }
+
+ public TaskSubmitEvent deepCopy() {
+ return new TaskSubmitEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.experimentId = null;
+ this.taskId = null;
+ this.gatewayId = null;
+ }
+
+ public String getExperimentId() {
+ return this.experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public void unsetExperimentId() {
+ this.experimentId = null;
+ }
+
+ /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+ public boolean isSetExperimentId() {
+ return this.experimentId != null;
+ }
+
+ public void setExperimentIdIsSet(boolean value) {
+ if (!value) {
+ this.experimentId = null;
+ }
+ }
+
+ public String getTaskId() {
+ return this.taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public void unsetTaskId() {
+ this.taskId = null;
+ }
+
+ /** Returns true if field taskId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTaskId() {
+ return this.taskId != null;
+ }
+
+ public void setTaskIdIsSet(boolean value) {
+ if (!value) {
+ this.taskId = null;
+ }
+ }
+
+ public String getGatewayId() {
+ return this.gatewayId;
+ }
+
+ public void setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ }
+
+ public void unsetGatewayId() {
+ this.gatewayId = null;
+ }
+
+ /** Returns true if field gatewayId is set (has been assigned a value) and false otherwise */
+ public boolean isSetGatewayId() {
+ return this.gatewayId != null;
+ }
+
+ public void setGatewayIdIsSet(boolean value) {
+ if (!value) {
+ this.gatewayId = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ if (value == null) {
+ unsetExperimentId();
+ } else {
+ setExperimentId((String)value);
+ }
+ break;
+
+ case TASK_ID:
+ if (value == null) {
+ unsetTaskId();
+ } else {
+ setTaskId((String)value);
+ }
+ break;
+
+ case GATEWAY_ID:
+ if (value == null) {
+ unsetGatewayId();
+ } else {
+ setGatewayId((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ return getExperimentId();
+
+ case TASK_ID:
+ return getTaskId();
+
+ case GATEWAY_ID:
+ return getGatewayId();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXPERIMENT_ID:
+ return isSetExperimentId();
+ case TASK_ID:
+ return isSetTaskId();
+ case GATEWAY_ID:
+ return isSetGatewayId();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof TaskSubmitEvent)
+ return this.equals((TaskSubmitEvent)that);
+ return false;
+ }
+
+ public boolean equals(TaskSubmitEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_experimentId = true && this.isSetExperimentId();
+ boolean that_present_experimentId = true && that.isSetExperimentId();
+ if (this_present_experimentId || that_present_experimentId) {
+ if (!(this_present_experimentId && that_present_experimentId))
+ return false;
+ if (!this.experimentId.equals(that.experimentId))
+ return false;
+ }
+
+ boolean this_present_taskId = true && this.isSetTaskId();
+ boolean that_present_taskId = true && that.isSetTaskId();
+ if (this_present_taskId || that_present_taskId) {
+ if (!(this_present_taskId && that_present_taskId))
+ return false;
+ if (!this.taskId.equals(that.taskId))
+ return false;
+ }
+
+ boolean this_present_gatewayId = true && this.isSetGatewayId();
+ boolean that_present_gatewayId = true && that.isSetGatewayId();
+ if (this_present_gatewayId || that_present_gatewayId) {
+ if (!(this_present_gatewayId && that_present_gatewayId))
+ return false;
+ if (!this.gatewayId.equals(that.gatewayId))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(TaskSubmitEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExperimentId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTaskId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetGatewayId()).compareTo(other.isSetGatewayId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGatewayId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gatewayId, other.gatewayId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TaskSubmitEvent(");
+ boolean first = true;
+
+ sb.append("experimentId:");
+ if (this.experimentId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.experimentId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("taskId:");
+ if (this.taskId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.taskId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("gatewayId:");
+ if (this.gatewayId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.gatewayId);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!isSetExperimentId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString());
+ }
+
+ if (!isSetTaskId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString());
+ }
+
+ if (!isSetGatewayId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class TaskSubmitEventStandardSchemeFactory implements SchemeFactory {
+ public TaskSubmitEventStandardScheme getScheme() {
+ return new TaskSubmitEventStandardScheme();
+ }
+ }
+
+ private static class TaskSubmitEventStandardScheme extends StandardScheme<TaskSubmitEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, TaskSubmitEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EXPERIMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // TASK_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // GATEWAY_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, TaskSubmitEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.experimentId != null) {
+ oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.experimentId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.taskId != null) {
+ oprot.writeFieldBegin(TASK_ID_FIELD_DESC);
+ oprot.writeString(struct.taskId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.gatewayId != null) {
+ oprot.writeFieldBegin(GATEWAY_ID_FIELD_DESC);
+ oprot.writeString(struct.gatewayId);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class TaskSubmitEventTupleSchemeFactory implements SchemeFactory {
+ public TaskSubmitEventTupleScheme getScheme() {
+ return new TaskSubmitEventTupleScheme();
+ }
+ }
+
+ private static class TaskSubmitEventTupleScheme extends TupleScheme<TaskSubmitEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, TaskSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.experimentId);
+ oprot.writeString(struct.taskId);
+ oprot.writeString(struct.gatewayId);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, TaskSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java
new file mode 100644
index 0000000..59b9f85
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class TaskTerminateEvent implements org.apache.thrift.TBase<TaskTerminateEvent, TaskTerminateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskTerminateEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskTerminateEvent");
+
+ private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new TaskTerminateEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new TaskTerminateEventTupleSchemeFactory());
+ }
+
+ private String experimentId; // required
+ private String taskId; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXPERIMENT_ID((short)1, "experimentId"),
+ TASK_ID((short)2, "taskId");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EXPERIMENT_ID
+ return EXPERIMENT_ID;
+ case 2: // TASK_ID
+ return TASK_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskTerminateEvent.class, metaDataMap);
+ }
+
+ public TaskTerminateEvent() {
+ }
+
+ public TaskTerminateEvent(
+ String experimentId,
+ String taskId)
+ {
+ this();
+ this.experimentId = experimentId;
+ this.taskId = taskId;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public TaskTerminateEvent(TaskTerminateEvent other) {
+ if (other.isSetExperimentId()) {
+ this.experimentId = other.experimentId;
+ }
+ if (other.isSetTaskId()) {
+ this.taskId = other.taskId;
+ }
+ }
+
+ public TaskTerminateEvent deepCopy() {
+ return new TaskTerminateEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.experimentId = null;
+ this.taskId = null;
+ }
+
+ public String getExperimentId() {
+ return this.experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public void unsetExperimentId() {
+ this.experimentId = null;
+ }
+
+ /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+ public boolean isSetExperimentId() {
+ return this.experimentId != null;
+ }
+
+ public void setExperimentIdIsSet(boolean value) {
+ if (!value) {
+ this.experimentId = null;
+ }
+ }
+
+ public String getTaskId() {
+ return this.taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public void unsetTaskId() {
+ this.taskId = null;
+ }
+
+ /** Returns true if field taskId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTaskId() {
+ return this.taskId != null;
+ }
+
+ public void setTaskIdIsSet(boolean value) {
+ if (!value) {
+ this.taskId = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ if (value == null) {
+ unsetExperimentId();
+ } else {
+ setExperimentId((String)value);
+ }
+ break;
+
+ case TASK_ID:
+ if (value == null) {
+ unsetTaskId();
+ } else {
+ setTaskId((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ return getExperimentId();
+
+ case TASK_ID:
+ return getTaskId();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXPERIMENT_ID:
+ return isSetExperimentId();
+ case TASK_ID:
+ return isSetTaskId();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof TaskTerminateEvent)
+ return this.equals((TaskTerminateEvent)that);
+ return false;
+ }
+
+ public boolean equals(TaskTerminateEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_experimentId = true && this.isSetExperimentId();
+ boolean that_present_experimentId = true && that.isSetExperimentId();
+ if (this_present_experimentId || that_present_experimentId) {
+ if (!(this_present_experimentId && that_present_experimentId))
+ return false;
+ if (!this.experimentId.equals(that.experimentId))
+ return false;
+ }
+
+ boolean this_present_taskId = true && this.isSetTaskId();
+ boolean that_present_taskId = true && that.isSetTaskId();
+ if (this_present_taskId || that_present_taskId) {
+ if (!(this_present_taskId && that_present_taskId))
+ return false;
+ if (!this.taskId.equals(that.taskId))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(TaskTerminateEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExperimentId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTaskId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TaskTerminateEvent(");
+ boolean first = true;
+
+ sb.append("experimentId:");
+ if (this.experimentId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.experimentId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("taskId:");
+ if (this.taskId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.taskId);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!isSetExperimentId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString());
+ }
+
+ if (!isSetTaskId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class TaskTerminateEventStandardSchemeFactory implements SchemeFactory {
+ public TaskTerminateEventStandardScheme getScheme() {
+ return new TaskTerminateEventStandardScheme();
+ }
+ }
+
+ private static class TaskTerminateEventStandardScheme extends StandardScheme<TaskTerminateEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, TaskTerminateEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EXPERIMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // TASK_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, TaskTerminateEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.experimentId != null) {
+ oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.experimentId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.taskId != null) {
+ oprot.writeFieldBegin(TASK_ID_FIELD_DESC);
+ oprot.writeString(struct.taskId);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class TaskTerminateEventTupleSchemeFactory implements SchemeFactory {
+ public TaskTerminateEventTupleScheme getScheme() {
+ return new TaskTerminateEventTupleScheme();
+ }
+ }
+
+ private static class TaskTerminateEventTupleScheme extends TupleScheme<TaskTerminateEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, TaskTerminateEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.experimentId);
+ oprot.writeString(struct.taskId);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, TaskTerminateEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/generate-thrift-files.sh
----------------------------------------------------------------------
diff --git a/airavata-api/generate-thrift-files.sh b/airavata-api/generate-thrift-files.sh
index 4cd3288..bd823e4 100755
--- a/airavata-api/generate-thrift-files.sh
+++ b/airavata-api/generate-thrift-files.sh
@@ -27,7 +27,7 @@ DATAMODEL_SRC_DIR='airavata-data-models/src/main/java'
JAVA_API_SDK_DIR='airavata-api-stubs/src/main/java'
CPP_SDK_DIR='airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/'
PHP_SDK_DIR='airavata-client-sdks/airavata-php-sdk/src/main/resources/lib'
-
+THRIFT_EXEC=thrift
# The Function fail prints error messages on failure and quits the script.
fail() {
echo $@
@@ -96,7 +96,7 @@ copy_changed_files() {
# Generation of thrift files will require installing Apache Thrift. Please add thrift to your path.
# Verify is thrift is installed, is in the path is at a specified version.
-VERSION=$(thrift -version 2>/dev/null | grep -F "${REQUIRED_THRIFT_VERSION}" | wc -l)
+VERSION=$($THRIFT_EXEC -version 2>/dev/null | grep -F "${REQUIRED_THRIFT_VERSION}" | wc -l)
if [ "$VERSION" -ne 1 ] ; then
echo "****************************************************"
echo "*** thrift is not installed or is not in the path"
@@ -125,11 +125,11 @@ rm -rf ${JAVA_BEAN_GEN_DIR}
# Generate the Airavata Data Model using thrift Java Beans generator. This will take generate the classes in bean style
# with members being private and setters returning voids.
# The airavataDataModel.thrift includes rest of data models.
-thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/airavataDataModel.thrift || fail unable to generate java bean thrift classes on base data model
+$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/airavataDataModel.thrift || fail unable to generate java bean thrift classes on base data model
-thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/appCatalogModels.thrift || fail unable to generate java bean thrift classes on app catalog data models
+$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/appCatalogModels.thrift || fail unable to generate java bean thrift classes on app catalog data models
-thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/workflowDataModel.thrift || fail unable to generate java bean thrift classes on app workflow data models
+$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/workflowDataModel.thrift || fail unable to generate java bean thrift classes on app workflow data models
# For the generated java beans add the ASF V2 License header
add_license_header $JAVA_BEAN_GEN_DIR
@@ -150,9 +150,9 @@ rm -rf ${JAVA_GEN_DIR}
# Using thrift Java generator, generate the java classes based on Airavata API. This
# The airavataAPI.thrift includes rest of data models.
-thrift ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate java thrift classes on AiravataAPI
+$THRIFT_EXEC ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate java thrift classes on AiravataAPI
-#thrift ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate java thrift classes on WorkflowAPI
+#$THRIFT_EXEC ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate java thrift classes on WorkflowAPI
# For the generated java classes add the ASF V2 License header
add_license_header $JAVA_GEN_DIR
@@ -173,9 +173,9 @@ rm -rf ${CPP_GEN_DIR}
# Using thrift Java generator, generate the java classes based on Airavata API. This
# The airavataAPI.thrift includes rest of data models.
-thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate C++ thrift classes
+/usr/local/Cellar/thrift/0.9.1/bin/thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate C++ thrift classes
-#thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate C++ thrift classes for WorkflowAPI
+#$THRIFT_EXEC ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate C++ thrift classes for WorkflowAPI
# For the generated CPP classes add the ASF V2 License header
add_license_header $CPP_GEN_DIR
@@ -195,9 +195,9 @@ rm -rf ${PHP_GEN_DIR}
# Using thrift Java generator, generate the java classes based on Airavata API. This
# The airavataAPI.thrift includes rest of data models.
-thrift ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate PHP thrift classes
+$THRIFT_EXEC ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate PHP thrift classes
-#thrift ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate PHP thrift classes for WorkflowAPI
+#$THRIFT_EXEC ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate PHP thrift classes for WorkflowAPI
# For the generated java classes add the ASF V2 License header
## TODO Write PHP license parser
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index c9f3808..d736701 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -38,7 +38,9 @@ enum MessageType {
EXPERIMENT,
TASK,
WORKFLOWNODE,
- JOB
+ JOB,
+ LAUNCHTASK,
+ TERMINATETASK
}
struct ExperimentStatusChangeEvent {
@@ -100,6 +102,17 @@ struct JobIdentifier {
// //8:
// }
+struct TaskSubmitEvent{
+ 1: required string experimentId,
+ 2: required string taskId,
+ 3: required string gatewayId
+}
+
+struct TaskTerminateEvent{
+ 1: required string experimentId,
+ 2: required string taskId,
+}
+
struct JobStatusChangeEvent {
1: required experimentModel.JobState state;
2: required JobIdentifier jobIdentity;
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 988ad3d..4ea0b44 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -52,6 +52,7 @@ public class ServerSettings extends ApplicationSettings {
private static final String MY_PROXY_PASSWORD = "myproxy.password";
private static final String MY_PROXY_LIFETIME = "myproxy.life";
private static final String ACTIVITY_PUBLISHER = "activity.publisher";
+ private static final String TASK_LAUNCH_PUBLISHER = "task.launch.publisher";
private static final String ACTIVITY_LISTENERS = "activity.listeners";
public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
@@ -154,6 +155,10 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(ACTIVITY_PUBLISHER);
}
+ public static String getTaskLaunchPublisher() throws ApplicationSettingsException{
+ return getSetting(TASK_LAUNCH_PUBLISHER);
+ }
+
public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
String setting = getSetting(PUBLISH_RABBITMQ);
return Boolean.parseBoolean(setting);
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index c73e61a..c90fab1 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -216,6 +216,7 @@ connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
publish.rabbitmq=false
activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
rabbitmq.broker.url=amqp://localhost:5672
rabbitmq.exchange.name=airavata_rabbitmq_exchange
@@ -224,7 +225,8 @@ rabbitmq.exchange.name=airavata_rabbitmq_exchange
###########################################################################
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
submitter.interval=10000
threadpool.size=10
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
index fb02901..d6be51a 100644
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
@@ -201,6 +201,7 @@ connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
publish.rabbitmq=false
activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
rabbitmq.broker.url=amqp://localhost:5672
rabbitmq.exchange.name=airavata_rabbitmq_exchange
@@ -209,7 +210,8 @@ rabbitmq.exchange.name=airavata_rabbitmq_exchange
###########################################################################
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter
job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
submitter.interval=10000
threadpool.size=10
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 00d313c..bb612a6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -116,7 +116,7 @@ public class BetterGfacImpl implements GFac,Watcher {
String[] listenerClassList = ServerSettings.getActivityListeners();
Publisher rabbitMQPublisher = null;
if (ServerSettings.isRabbitMqPublishEnabled()){
- rabbitMQPublisher = PublisherFactory.createPublisher();
+ rabbitMQPublisher = PublisherFactory.createActivityPublisher();
}
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
index 2080cc6..59cdbdf 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
public class PublisherFactory {
private static Logger log = LoggerFactory.getLogger(PublisherFactory.class);
- public static Publisher createPublisher() throws AiravataException {
+ public static Publisher createActivityPublisher() throws AiravataException {
String activityPublisher = ServerSettings.getActivityPublisher();
if (activityPublisher == null) {
@@ -47,4 +47,23 @@ public class PublisherFactory {
throw new AiravataException(msg, e);
}
}
+
+ public static Publisher createTaskLaunchPublisher() throws AiravataException {
+ String taskLaunchPublisher = ServerSettings.getTaskLaunchPublisher();
+
+ if (taskLaunchPublisher == null) {
+ String s = "Task launch publisher is not specified";
+ log.error(s);
+ throw new AiravataException(s);
+ }
+
+ try {
+ Class<? extends Publisher> aPublisher = Class.forName(taskLaunchPublisher).asSubclass(Publisher.class);
+ return aPublisher.newInstance();
+ } catch (Exception e) {
+ String msg = "Failed to load the publisher from the publisher class property: " + taskLaunchPublisher;
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
deleted file mode 100644
index ff14a8c..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQPublisher implements Publisher {
-
- private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
-
- private RabbitMQProducer rabbitMQProducer;
-
-
- public RabbitMQPublisher() throws Exception {
- String brokerUrl;
- String exchangeName;
- try {
- brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
- rabbitMQProducer.open();
- }
-
- public void publish(MessageContext msgCtx) throws AiravataException {
- try {
- log.info("Publishing status to rabbitmq...");
- byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
- Message message = new Message();
- message.setEvent(body);
- message.setMessageId(msgCtx.getMessageId());
- message.setMessageType(msgCtx.getType());
- message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
- String routingKey = null;
- if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
- ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
- routingKey = event.getExperimentId();
- } else if (msgCtx.getType().equals(MessageType.TASK)) {
- TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
- routingKey = event.getTaskIdentity().getExperimentId() + "." +
- event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
- }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
- WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
- WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
- routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
- }else if (msgCtx.getType().equals(MessageType.JOB)){
- JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
- JobIdentifier identity = event.getJobIdentity();
- routingKey = identity.getExperimentId() + "." +
- identity.getWorkflowNodeId() + "." +
- identity.getTaskId() + "." +
- identity.getJobId();
- }
- byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.send(messageBody, routingKey);
- } catch (TException e) {
- String msg = "Error while deserializing the object";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- } catch (Exception e) {
- String msg = "Error while sending to rabbitmq";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
new file mode 100644
index 0000000..a4b4d1a
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQStatusPublisher implements Publisher {
+
+ private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
+
+ private RabbitMQProducer rabbitMQProducer;
+
+
+ public RabbitMQStatusPublisher() throws Exception {
+ String brokerUrl;
+ String exchangeName;
+ try {
+ brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+ rabbitMQProducer.open();
+ }
+
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ log.info("Publishing status to rabbitmq...");
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+ String routingKey = null;
+ if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+ ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+ routingKey = event.getExperimentId();
+ } else if (msgCtx.getType().equals(MessageType.TASK)) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ routingKey = event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+ }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+ WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
+ WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
+ routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+ }else if (msgCtx.getType().equals(MessageType.JOB)){
+ JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+ JobIdentifier identity = event.getJobIdentity();
+ routingKey = identity.getExperimentId() + "." +
+ identity.getWorkflowNodeId() + "." +
+ identity.getTaskId() + "." +
+ identity.getJobId();
+ }
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.send(messageBody, routingKey);
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}