You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/24 21:54:37 UTC
[1/2] git commit: Adding broker url
Repository: airavata
Updated Branches:
refs/heads/messaging_framework a7581e747 -> 88d8638f2
Adding broker url
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/412f1e7a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/412f1e7a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/412f1e7a
Branch: refs/heads/messaging_framework
Commit: 412f1e7af00b7800b6794454b1c4265529eafbe7
Parents: a7581e7
Author: chathuriw <ka...@gmail.com>
Authored: Wed Sep 24 10:19:15 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Wed Sep 24 10:19:15 2014 -0400
----------------------------------------------------------------------
.../server/src/main/resources/airavata-server.properties | 1 +
.../messaging/core/impl/AiravataRabbitMQPublisher.java | 8 ++++++++
2 files changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/412f1e7a/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index ebf5350..51d71e1 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -189,6 +189,7 @@ connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
#publisher
activity.publisher=org.apache.airavata.messaging.core.impl.AiravataRabbitMQPublisher
+rabbitmq.broker.url=http://localhost
###---------------------------Orchestrator module Configurations---------------------------###
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/412f1e7a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
index 7fc5342..eaa6158 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
@@ -28,7 +28,15 @@ import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
public class AiravataRabbitMQPublisher implements Publisher {
+ private String brokerUrl;
+ private String routingKey;
+ private String exchangeName;
+ private int prefetchCount;
+ private boolean isRequeueOnFail;
+
public AiravataRabbitMQPublisher() {
+
+ RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
}
public void publish(ExperimentStatusChangeEvent event) {
[2/2] git commit: new message thrift object
Posted by ch...@apache.org.
new message thrift object
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/88d8638f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/88d8638f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/88d8638f
Branch: refs/heads/messaging_framework
Commit: 88d8638f27b18e0b38c24985906142460ef92d47
Parents: 412f1e7
Author: chathuriw <ka...@gmail.com>
Authored: Wed Sep 24 15:54:05 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Wed Sep 24 15:54:05 2014 -0400
----------------------------------------------------------------------
.../AiravataExperimentStatusUpdator.java | 17 +-
.../lib/airavata/messagingEvents_constants.cpp | 2 +
.../lib/airavata/messagingEvents_constants.h | 1 +
.../lib/airavata/messagingEvents_types.cpp | 157 ++++++++++++++++
.../lib/airavata/messagingEvents_types.h | 100 +++++++++++
.../Airavata/Model/Messaging/Event/Types.php | 180 +++++++++++++++++++
.../messagingEvents.thrift | 24 +++
.../airavata/common/utils/AiravataUtils.java | 6 +
.../core/monitor/AiravataJobStatusUpdator.java | 16 +-
.../core/monitor/AiravataTaskStatusUpdator.java | 14 +-
.../AiravataWorkflowNodeStatusUpdator.java | 18 +-
.../airavata/messaging/core/Publisher.java | 10 +-
.../core/impl/AiravataRabbitMQPublisher.java | 19 +-
13 files changed, 529 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index aff0e07..9c8bded 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -20,14 +20,16 @@
*/
package org.apache.airavata.api.server.listener;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.util.Calendar;
import org.apache.airavata.api.server.util.DataModelUtils;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.util.ExecutionType;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
@@ -87,7 +89,16 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
- publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
+ ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId());
+ Message message = new Message();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(experimentStatusChangeEvent);
+ message.setEvent(baos.toByteArray());
+ message.setMessageType(MessageType.EXPERIMENT);
+ message.setMessageLevel(MessageLevel.INFO);
+ message.setMessageId(AiravataUtils.getId("EXP"));
+ publisher.publish(message);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
index a02179a..5086851 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
@@ -28,6 +28,8 @@ namespace apache { namespace airavata { namespace model { namespace messaging {
const messagingEventsConstants g_messagingEvents_constants;
messagingEventsConstants::messagingEventsConstants() {
+ DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
+
}
}}}}} // namespace
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
index c91fd31..b6d7dbc 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
@@ -32,6 +32,7 @@ class messagingEventsConstants {
public:
messagingEventsConstants();
+ std::string DEFAULT_ID;
};
extern const messagingEventsConstants g_messagingEvents_constants;
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index a96736f..fae1f5d 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -27,6 +27,34 @@
namespace apache { namespace airavata { namespace model { namespace messaging { namespace event {
+int _kMessageLevelValues[] = {
+ MessageLevel::INFO,
+ MessageLevel::DEBUG,
+ MessageLevel::ERROR,
+ MessageLevel::ACK
+};
+const char* _kMessageLevelNames[] = {
+ "INFO",
+ "DEBUG",
+ "ERROR",
+ "ACK"
+};
+const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageLevelValues, _kMessageLevelNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
+int _kMessageTypeValues[] = {
+ MessageType::EXPERIMENT,
+ MessageType::TASK,
+ MessageType::WORKFLOWNODE,
+ MessageType::JOB
+};
+const char* _kMessageTypeNames[] = {
+ "EXPERIMENT",
+ "TASK",
+ "WORKFLOWNODE",
+ "JOB"
+};
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
const char* ExperimentStatusChangeEvent::ascii_fingerprint = "19B5240589E680301A7E32DF3971EFBE";
const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x19,0xB5,0x24,0x05,0x89,0xE6,0x80,0x30,0x1A,0x7E,0x32,0xDF,0x39,0x71,0xEF,0xBE};
@@ -743,4 +771,133 @@ void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b) {
swap(a.jobIdentity, b.jobIdentity);
}
+const char* Message::ascii_fingerprint = "6904C391426E568AF9DEAF69860C076A";
+const uint8_t Message::binary_fingerprint[16] = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A};
+
+uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_event = false;
+ bool isset_messageId = false;
+ bool isset_messageType = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readBinary(this->event);
+ isset_event = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->messageId);
+ isset_messageId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ int32_t ecast10;
+ xfer += iprot->readI32(ecast10);
+ this->messageType = (MessageType::type)ecast10;
+ isset_messageType = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->updatedTime);
+ this->__isset.updatedTime = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 5:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ int32_t ecast11;
+ xfer += iprot->readI32(ecast11);
+ this->messageLevel = (MessageLevel::type)ecast11;
+ this->__isset.messageLevel = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_event)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_messageId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_messageType)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t Message::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("Message");
+
+ xfer += oprot->writeFieldBegin("event", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeBinary(this->event);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("messageId", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->messageId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("messageType", ::apache::thrift::protocol::T_I32, 3);
+ xfer += oprot->writeI32((int32_t)this->messageType);
+ xfer += oprot->writeFieldEnd();
+
+ if (this->__isset.updatedTime) {
+ xfer += oprot->writeFieldBegin("updatedTime", ::apache::thrift::protocol::T_I64, 4);
+ xfer += oprot->writeI64(this->updatedTime);
+ xfer += oprot->writeFieldEnd();
+ }
+ if (this->__isset.messageLevel) {
+ xfer += oprot->writeFieldBegin("messageLevel", ::apache::thrift::protocol::T_I32, 5);
+ xfer += oprot->writeI32((int32_t)this->messageLevel);
+ xfer += oprot->writeFieldEnd();
+ }
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(Message &a, Message &b) {
+ using ::std::swap;
+ swap(a.event, b.event);
+ swap(a.messageId, b.messageId);
+ swap(a.messageType, b.messageType);
+ swap(a.updatedTime, b.updatedTime);
+ swap(a.messageLevel, b.messageLevel);
+ swap(a.__isset, b.__isset);
+}
+
}}}}} // namespace
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index ad3d052..fe80785 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -35,6 +35,28 @@
namespace apache { namespace airavata { namespace model { namespace messaging { namespace event {
+struct MessageLevel {
+ enum type {
+ INFO = 0,
+ DEBUG = 1,
+ ERROR = 2,
+ ACK = 3
+ };
+};
+
+extern const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES;
+
+struct MessageType {
+ enum type {
+ EXPERIMENT = 0,
+ TASK = 1,
+ WORKFLOWNODE = 2,
+ JOB = 3
+ };
+};
+
+extern const std::map<int, const char*> _MessageType_VALUES_TO_NAMES;
+
class ExperimentStatusChangeEvent {
public:
@@ -408,6 +430,84 @@ class JobStatusChangeEvent {
void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b);
+typedef struct _Message__isset {
+ _Message__isset() : updatedTime(false), messageLevel(false) {}
+ bool updatedTime;
+ bool messageLevel;
+} _Message__isset;
+
+class Message {
+ public:
+
+ static const char* ascii_fingerprint; // = "6904C391426E568AF9DEAF69860C076A";
+ static const uint8_t binary_fingerprint[16]; // = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A};
+
+ Message() : event(), messageId("DO_NOT_SET_AT_CLIENTS"), messageType((MessageType::type)0), updatedTime(0), messageLevel((MessageLevel::type)0) {
+ }
+
+ virtual ~Message() throw() {}
+
+ std::string event;
+ std::string messageId;
+ MessageType::type messageType;
+ int64_t updatedTime;
+ MessageLevel::type messageLevel;
+
+ _Message__isset __isset;
+
+ void __set_event(const std::string& val) {
+ event = val;
+ }
+
+ void __set_messageId(const std::string& val) {
+ messageId = val;
+ }
+
+ void __set_messageType(const MessageType::type val) {
+ messageType = val;
+ }
+
+ void __set_updatedTime(const int64_t val) {
+ updatedTime = val;
+ __isset.updatedTime = true;
+ }
+
+ void __set_messageLevel(const MessageLevel::type val) {
+ messageLevel = val;
+ __isset.messageLevel = true;
+ }
+
+ bool operator == (const Message & rhs) const
+ {
+ if (!(event == rhs.event))
+ return false;
+ if (!(messageId == rhs.messageId))
+ return false;
+ if (!(messageType == rhs.messageType))
+ return false;
+ if (__isset.updatedTime != rhs.__isset.updatedTime)
+ return false;
+ else if (__isset.updatedTime && !(updatedTime == rhs.updatedTime))
+ return false;
+ if (__isset.messageLevel != rhs.__isset.messageLevel)
+ return false;
+ else if (__isset.messageLevel && !(messageLevel == rhs.messageLevel))
+ return false;
+ return true;
+ }
+ bool operator != (const Message &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const Message & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(Message &a, Message &b);
+
}}}}} // namespace
#endif
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index 7bcf528..989e7b2 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -17,6 +17,32 @@ use Thrift\Protocol\TBinaryProtocolAccelerated;
use Thrift\Exception\TApplicationException;
+final class MessageLevel {
+ const INFO = 0;
+ const DEBUG = 1;
+ const ERROR = 2;
+ const ACK = 3;
+ static public $__names = array(
+ 0 => 'INFO',
+ 1 => 'DEBUG',
+ 2 => 'ERROR',
+ 3 => 'ACK',
+ );
+}
+
+final class MessageType {
+ const EXPERIMENT = 0;
+ const TASK = 1;
+ const WORKFLOWNODE = 2;
+ const JOB = 3;
+ static public $__names = array(
+ 0 => 'EXPERIMENT',
+ 1 => 'TASK',
+ 2 => 'WORKFLOWNODE',
+ 3 => 'JOB',
+ );
+}
+
class ExperimentStatusChangeEvent {
static $_TSPEC;
@@ -861,4 +887,158 @@ class JobStatusChangeEvent {
}
+class Message {
+ static $_TSPEC;
+
+ public $event = null;
+ public $messageId = "DO_NOT_SET_AT_CLIENTS";
+ public $messageType = null;
+ public $updatedTime = null;
+ public $messageLevel = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'event',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'messageId',
+ 'type' => TType::STRING,
+ ),
+ 3 => array(
+ 'var' => 'messageType',
+ 'type' => TType::I32,
+ ),
+ 4 => array(
+ 'var' => 'updatedTime',
+ 'type' => TType::I64,
+ ),
+ 5 => array(
+ 'var' => 'messageLevel',
+ 'type' => TType::I32,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['event'])) {
+ $this->event = $vals['event'];
+ }
+ if (isset($vals['messageId'])) {
+ $this->messageId = $vals['messageId'];
+ }
+ if (isset($vals['messageType'])) {
+ $this->messageType = $vals['messageType'];
+ }
+ if (isset($vals['updatedTime'])) {
+ $this->updatedTime = $vals['updatedTime'];
+ }
+ if (isset($vals['messageLevel'])) {
+ $this->messageLevel = $vals['messageLevel'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'Message';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->event);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->messageId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 3:
+ if ($ftype == TType::I32) {
+ $xfer += $input->readI32($this->messageType);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 4:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->updatedTime);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 5:
+ if ($ftype == TType::I32) {
+ $xfer += $input->readI32($this->messageLevel);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('Message');
+ if ($this->event !== null) {
+ $xfer += $output->writeFieldBegin('event', TType::STRING, 1);
+ $xfer += $output->writeString($this->event);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->messageId !== null) {
+ $xfer += $output->writeFieldBegin('messageId', TType::STRING, 2);
+ $xfer += $output->writeString($this->messageId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->messageType !== null) {
+ $xfer += $output->writeFieldBegin('messageType', TType::I32, 3);
+ $xfer += $output->writeI32($this->messageType);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->updatedTime !== null) {
+ $xfer += $output->writeFieldBegin('updatedTime', TType::I64, 4);
+ $xfer += $output->writeI64($this->updatedTime);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->messageLevel !== null) {
+ $xfer += $output->writeFieldBegin('messageLevel', TType::I32, 5);
+ $xfer += $output->writeI32($this->messageLevel);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+$GLOBALS['messagingEvents_CONSTANTS']['DEFAULT_ID'] = "DO_NOT_SET_AT_CLIENTS";
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index 38c25a4..8eaddcb 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -24,6 +24,22 @@ namespace java org.apache.airavata.model.messaging.event
namespace php Airavata.Model.Messaging.Event
namespace cpp apache.airavata.model.messaging.event
+const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
+
+enum MessageLevel {
+ INFO,
+ DEBUG,
+ ERROR,
+ ACK
+}
+
+enum MessageType {
+ EXPERIMENT,
+ TASK,
+ WORKFLOWNODE,
+ JOB
+}
+
struct ExperimentStatusChangeEvent {
1: required experimentModel.ExperimentState state;
2: required string experimentId;
@@ -80,6 +96,14 @@ struct JobStatusChangeEvent {
// 3: required JobMonitor jobMonitor;
}
+struct Message {
+ 1: required binary event;
+ 2: required string messageId = DEFAULT_ID;
+ 3: required MessageType messageType;
+ 4: optional i64 updatedTime;
+ 5: optional MessageLevel messageLevel;
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
index 6c29313..99b600f 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
@@ -23,6 +23,7 @@ package org.apache.airavata.common.utils;
import java.sql.Timestamp;
import java.util.Calendar;
+import java.util.UUID;
public class AiravataUtils {
public static final String EXECUTION_MODE="application.execution.mode";
@@ -66,4 +67,9 @@ public class AiravataUtils {
}
return new Timestamp(time);
}
+
+ public static String getId (String name){
+ String id = name.replaceAll("\\s", "");
+ return id + "_" + UUID.randomUUID();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index d142ceb..5f9e36f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -20,12 +20,15 @@
*/
package org.apache.airavata.gfac.core.monitor;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
@@ -67,7 +70,16 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
updateJobStatus(taskID, jobID, state);
logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
- publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
+ JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
+ Message message = new Message();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(changeEvent);
+ message.setEvent(baos.toByteArray());
+ message.setMessageType(MessageType.JOB);
+ message.setMessageLevel(MessageLevel.INFO);
+ message.setMessageId(AiravataUtils.getId("JOB"));
+ publisher.publish(message);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index f4e6241..dd91c61 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -20,8 +20,11 @@
*/
package org.apache.airavata.gfac.core.monitor;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.Publisher;
@@ -93,7 +96,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
jobStatus.getJobIdentity().getWorkflowNodeId(),
jobStatus.getJobIdentity().getExperimentId());
monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
- publisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
+ TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity);
+ Message message = new Message();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(changeEvent);
+ message.setEvent(baos.toByteArray());
+ message.setMessageType(MessageType.TASK);
+ message.setMessageLevel(MessageLevel.INFO);
+ message.setMessageId(AiravataUtils.getId("TASK"));
+ publisher.publish(message);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index fe24bd0..5b9a5ed 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -20,14 +20,15 @@
*/
package org.apache.airavata.gfac.core.monitor;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowIdentity;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -80,7 +81,16 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
- publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
+ WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+ Message message = new Message();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(changeEvent);
+ message.setEvent(baos.toByteArray());
+ message.setMessageType(MessageType.WORKFLOWNODE);
+ message.setMessageLevel(MessageLevel.INFO);
+ message.setMessageId(AiravataUtils.getId("NODE"));
+ publisher.publish(message);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index 24cfe59..24c8e2a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -1,13 +1,7 @@
package org.apache.airavata.messaging.core;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
public interface Publisher {
- public void publish(ExperimentStatusChangeEvent event);
- public void publish(WorkflowNodeStatusChangeEvent event);
- public void publish(TaskStatusChangeEvent event);
- public void publish(JobStatusChangeEvent event);
+ public void publish(Message message);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
index eaa6158..0fda442 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
@@ -22,10 +22,7 @@
package org.apache.airavata.messaging.core.impl;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
public class AiravataRabbitMQPublisher implements Publisher {
private String brokerUrl;
@@ -39,19 +36,7 @@ public class AiravataRabbitMQPublisher implements Publisher {
RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
}
- public void publish(ExperimentStatusChangeEvent event) {
-
- }
-
- public void publish(WorkflowNodeStatusChangeEvent event) {
-
- }
-
- public void publish(TaskStatusChangeEvent event) {
-
- }
-
- public void publish(JobStatusChangeEvent event) {
+ public void publish(Message message) {
}
}