You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/24 21:54:37 UTC

[1/2] git commit: Adding broker url

Repository: airavata
Updated Branches:
  refs/heads/messaging_framework a7581e747 -> 88d8638f2


Adding broker url


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/412f1e7a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/412f1e7a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/412f1e7a

Branch: refs/heads/messaging_framework
Commit: 412f1e7af00b7800b6794454b1c4265529eafbe7
Parents: a7581e7
Author: chathuriw <ka...@gmail.com>
Authored: Wed Sep 24 10:19:15 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Wed Sep 24 10:19:15 2014 -0400

----------------------------------------------------------------------
 .../server/src/main/resources/airavata-server.properties     | 1 +
 .../messaging/core/impl/AiravataRabbitMQPublisher.java       | 8 ++++++++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/412f1e7a/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index ebf5350..51d71e1 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -189,6 +189,7 @@ connection.name=xsede
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
 #publisher
 activity.publisher=org.apache.airavata.messaging.core.impl.AiravataRabbitMQPublisher
+rabbitmq.broker.url=http://localhost
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/412f1e7a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
index 7fc5342..eaa6158 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
@@ -28,7 +28,15 @@ import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
 
 public class AiravataRabbitMQPublisher implements Publisher {
+    private String brokerUrl;
+    private String routingKey;
+    private String exchangeName;
+    private int prefetchCount;
+    private boolean isRequeueOnFail;
+
     public AiravataRabbitMQPublisher() {
+
+        RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
     }
 
     public void publish(ExperimentStatusChangeEvent event) {


[2/2] git commit: new message thrift object

Posted by ch...@apache.org.
new message thrift object


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/88d8638f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/88d8638f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/88d8638f

Branch: refs/heads/messaging_framework
Commit: 88d8638f27b18e0b38c24985906142460ef92d47
Parents: 412f1e7
Author: chathuriw <ka...@gmail.com>
Authored: Wed Sep 24 15:54:05 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Wed Sep 24 15:54:05 2014 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        |  17 +-
 .../lib/airavata/messagingEvents_constants.cpp  |   2 +
 .../lib/airavata/messagingEvents_constants.h    |   1 +
 .../lib/airavata/messagingEvents_types.cpp      | 157 ++++++++++++++++
 .../lib/airavata/messagingEvents_types.h        | 100 +++++++++++
 .../Airavata/Model/Messaging/Event/Types.php    | 180 +++++++++++++++++++
 .../messagingEvents.thrift                      |  24 +++
 .../airavata/common/utils/AiravataUtils.java    |   6 +
 .../core/monitor/AiravataJobStatusUpdator.java  |  16 +-
 .../core/monitor/AiravataTaskStatusUpdator.java |  14 +-
 .../AiravataWorkflowNodeStatusUpdator.java      |  18 +-
 .../airavata/messaging/core/Publisher.java      |  10 +-
 .../core/impl/AiravataRabbitMQPublisher.java    |  19 +-
 13 files changed, 529 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index aff0e07..9c8bded 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -20,14 +20,16 @@
 */
 package org.apache.airavata.api.server.listener;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Calendar;
 
 import org.apache.airavata.api.server.util.DataModelUtils;
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.util.ExecutionType;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
@@ -87,7 +89,16 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 			updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
 			logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
 			monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
-			publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
+            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId());
+            Message message = new Message();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(experimentStatusChangeEvent);
+            message.setEvent(baos.toByteArray());
+            message.setMessageType(MessageType.EXPERIMENT);
+            message.setMessageLevel(MessageLevel.INFO);
+            message.setMessageId(AiravataUtils.getId("EXP"));
+            publisher.publish(message);
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
 		}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
index a02179a..5086851 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp
@@ -28,6 +28,8 @@ namespace apache { namespace airavata { namespace model { namespace messaging {
 const messagingEventsConstants g_messagingEvents_constants;
 
 messagingEventsConstants::messagingEventsConstants() {
+  DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
+
 }
 
 }}}}} // namespace

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
index c91fd31..b6d7dbc 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h
@@ -32,6 +32,7 @@ class messagingEventsConstants {
  public:
   messagingEventsConstants();
 
+  std::string DEFAULT_ID;
 };
 
 extern const messagingEventsConstants g_messagingEvents_constants;

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index a96736f..fae1f5d 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -27,6 +27,34 @@
 
 namespace apache { namespace airavata { namespace model { namespace messaging { namespace event {
 
+int _kMessageLevelValues[] = {
+  MessageLevel::INFO,
+  MessageLevel::DEBUG,
+  MessageLevel::ERROR,
+  MessageLevel::ACK
+};
+const char* _kMessageLevelNames[] = {
+  "INFO",
+  "DEBUG",
+  "ERROR",
+  "ACK"
+};
+const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageLevelValues, _kMessageLevelNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
+int _kMessageTypeValues[] = {
+  MessageType::EXPERIMENT,
+  MessageType::TASK,
+  MessageType::WORKFLOWNODE,
+  MessageType::JOB
+};
+const char* _kMessageTypeNames[] = {
+  "EXPERIMENT",
+  "TASK",
+  "WORKFLOWNODE",
+  "JOB"
+};
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
 const char* ExperimentStatusChangeEvent::ascii_fingerprint = "19B5240589E680301A7E32DF3971EFBE";
 const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x19,0xB5,0x24,0x05,0x89,0xE6,0x80,0x30,0x1A,0x7E,0x32,0xDF,0x39,0x71,0xEF,0xBE};
 
@@ -743,4 +771,133 @@ void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b) {
   swap(a.jobIdentity, b.jobIdentity);
 }
 
+const char* Message::ascii_fingerprint = "6904C391426E568AF9DEAF69860C076A";
+const uint8_t Message::binary_fingerprint[16] = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A};
+
+uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+  uint32_t xfer = 0;
+  std::string fname;
+  ::apache::thrift::protocol::TType ftype;
+  int16_t fid;
+
+  xfer += iprot->readStructBegin(fname);
+
+  using ::apache::thrift::protocol::TProtocolException;
+
+  bool isset_event = false;
+  bool isset_messageId = false;
+  bool isset_messageType = false;
+
+  while (true)
+  {
+    xfer += iprot->readFieldBegin(fname, ftype, fid);
+    if (ftype == ::apache::thrift::protocol::T_STOP) {
+      break;
+    }
+    switch (fid)
+    {
+      case 1:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readBinary(this->event);
+          isset_event = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->messageId);
+          isset_messageId = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 3:
+        if (ftype == ::apache::thrift::protocol::T_I32) {
+          int32_t ecast10;
+          xfer += iprot->readI32(ecast10);
+          this->messageType = (MessageType::type)ecast10;
+          isset_messageType = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 4:
+        if (ftype == ::apache::thrift::protocol::T_I64) {
+          xfer += iprot->readI64(this->updatedTime);
+          this->__isset.updatedTime = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 5:
+        if (ftype == ::apache::thrift::protocol::T_I32) {
+          int32_t ecast11;
+          xfer += iprot->readI32(ecast11);
+          this->messageLevel = (MessageLevel::type)ecast11;
+          this->__isset.messageLevel = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      default:
+        xfer += iprot->skip(ftype);
+        break;
+    }
+    xfer += iprot->readFieldEnd();
+  }
+
+  xfer += iprot->readStructEnd();
+
+  if (!isset_event)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_messageId)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_messageType)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  return xfer;
+}
+
+uint32_t Message::write(::apache::thrift::protocol::TProtocol* oprot) const {
+  uint32_t xfer = 0;
+  xfer += oprot->writeStructBegin("Message");
+
+  xfer += oprot->writeFieldBegin("event", ::apache::thrift::protocol::T_STRING, 1);
+  xfer += oprot->writeBinary(this->event);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("messageId", ::apache::thrift::protocol::T_STRING, 2);
+  xfer += oprot->writeString(this->messageId);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("messageType", ::apache::thrift::protocol::T_I32, 3);
+  xfer += oprot->writeI32((int32_t)this->messageType);
+  xfer += oprot->writeFieldEnd();
+
+  if (this->__isset.updatedTime) {
+    xfer += oprot->writeFieldBegin("updatedTime", ::apache::thrift::protocol::T_I64, 4);
+    xfer += oprot->writeI64(this->updatedTime);
+    xfer += oprot->writeFieldEnd();
+  }
+  if (this->__isset.messageLevel) {
+    xfer += oprot->writeFieldBegin("messageLevel", ::apache::thrift::protocol::T_I32, 5);
+    xfer += oprot->writeI32((int32_t)this->messageLevel);
+    xfer += oprot->writeFieldEnd();
+  }
+  xfer += oprot->writeFieldStop();
+  xfer += oprot->writeStructEnd();
+  return xfer;
+}
+
+void swap(Message &a, Message &b) {
+  using ::std::swap;
+  swap(a.event, b.event);
+  swap(a.messageId, b.messageId);
+  swap(a.messageType, b.messageType);
+  swap(a.updatedTime, b.updatedTime);
+  swap(a.messageLevel, b.messageLevel);
+  swap(a.__isset, b.__isset);
+}
+
 }}}}} // namespace

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index ad3d052..fe80785 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -35,6 +35,28 @@
 
 namespace apache { namespace airavata { namespace model { namespace messaging { namespace event {
 
+struct MessageLevel {
+  enum type {
+    INFO = 0,
+    DEBUG = 1,
+    ERROR = 2,
+    ACK = 3
+  };
+};
+
+extern const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES;
+
+struct MessageType {
+  enum type {
+    EXPERIMENT = 0,
+    TASK = 1,
+    WORKFLOWNODE = 2,
+    JOB = 3
+  };
+};
+
+extern const std::map<int, const char*> _MessageType_VALUES_TO_NAMES;
+
 
 class ExperimentStatusChangeEvent {
  public:
@@ -408,6 +430,84 @@ class JobStatusChangeEvent {
 
 void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b);
 
+typedef struct _Message__isset {
+  _Message__isset() : updatedTime(false), messageLevel(false) {}
+  bool updatedTime;
+  bool messageLevel;
+} _Message__isset;
+
+class Message {
+ public:
+
+  static const char* ascii_fingerprint; // = "6904C391426E568AF9DEAF69860C076A";
+  static const uint8_t binary_fingerprint[16]; // = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A};
+
+  Message() : event(), messageId("DO_NOT_SET_AT_CLIENTS"), messageType((MessageType::type)0), updatedTime(0), messageLevel((MessageLevel::type)0) {
+  }
+
+  virtual ~Message() throw() {}
+
+  std::string event;
+  std::string messageId;
+  MessageType::type messageType;
+  int64_t updatedTime;
+  MessageLevel::type messageLevel;
+
+  _Message__isset __isset;
+
+  void __set_event(const std::string& val) {
+    event = val;
+  }
+
+  void __set_messageId(const std::string& val) {
+    messageId = val;
+  }
+
+  void __set_messageType(const MessageType::type val) {
+    messageType = val;
+  }
+
+  void __set_updatedTime(const int64_t val) {
+    updatedTime = val;
+    __isset.updatedTime = true;
+  }
+
+  void __set_messageLevel(const MessageLevel::type val) {
+    messageLevel = val;
+    __isset.messageLevel = true;
+  }
+
+  bool operator == (const Message & rhs) const
+  {
+    if (!(event == rhs.event))
+      return false;
+    if (!(messageId == rhs.messageId))
+      return false;
+    if (!(messageType == rhs.messageType))
+      return false;
+    if (__isset.updatedTime != rhs.__isset.updatedTime)
+      return false;
+    else if (__isset.updatedTime && !(updatedTime == rhs.updatedTime))
+      return false;
+    if (__isset.messageLevel != rhs.__isset.messageLevel)
+      return false;
+    else if (__isset.messageLevel && !(messageLevel == rhs.messageLevel))
+      return false;
+    return true;
+  }
+  bool operator != (const Message &rhs) const {
+    return !(*this == rhs);
+  }
+
+  bool operator < (const Message & ) const;
+
+  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(Message &a, Message &b);
+
 }}}}} // namespace
 
 #endif

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index 7bcf528..989e7b2 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -17,6 +17,32 @@ use Thrift\Protocol\TBinaryProtocolAccelerated;
 use Thrift\Exception\TApplicationException;
 
 
+final class MessageLevel {
+  const INFO = 0;
+  const DEBUG = 1;
+  const ERROR = 2;
+  const ACK = 3;
+  static public $__names = array(
+    0 => 'INFO',
+    1 => 'DEBUG',
+    2 => 'ERROR',
+    3 => 'ACK',
+  );
+}
+
+final class MessageType {
+  const EXPERIMENT = 0;
+  const TASK = 1;
+  const WORKFLOWNODE = 2;
+  const JOB = 3;
+  static public $__names = array(
+    0 => 'EXPERIMENT',
+    1 => 'TASK',
+    2 => 'WORKFLOWNODE',
+    3 => 'JOB',
+  );
+}
+
 class ExperimentStatusChangeEvent {
   static $_TSPEC;
 
@@ -861,4 +887,158 @@ class JobStatusChangeEvent {
 
 }
 
+class Message {
+  static $_TSPEC;
+
+  public $event = null;
+  public $messageId = "DO_NOT_SET_AT_CLIENTS";
+  public $messageType = null;
+  public $updatedTime = null;
+  public $messageLevel = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        1 => array(
+          'var' => 'event',
+          'type' => TType::STRING,
+          ),
+        2 => array(
+          'var' => 'messageId',
+          'type' => TType::STRING,
+          ),
+        3 => array(
+          'var' => 'messageType',
+          'type' => TType::I32,
+          ),
+        4 => array(
+          'var' => 'updatedTime',
+          'type' => TType::I64,
+          ),
+        5 => array(
+          'var' => 'messageLevel',
+          'type' => TType::I32,
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['event'])) {
+        $this->event = $vals['event'];
+      }
+      if (isset($vals['messageId'])) {
+        $this->messageId = $vals['messageId'];
+      }
+      if (isset($vals['messageType'])) {
+        $this->messageType = $vals['messageType'];
+      }
+      if (isset($vals['updatedTime'])) {
+        $this->updatedTime = $vals['updatedTime'];
+      }
+      if (isset($vals['messageLevel'])) {
+        $this->messageLevel = $vals['messageLevel'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'Message';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 1:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->event);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->messageId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 3:
+          if ($ftype == TType::I32) {
+            $xfer += $input->readI32($this->messageType);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 4:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->updatedTime);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 5:
+          if ($ftype == TType::I32) {
+            $xfer += $input->readI32($this->messageLevel);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('Message');
+    if ($this->event !== null) {
+      $xfer += $output->writeFieldBegin('event', TType::STRING, 1);
+      $xfer += $output->writeString($this->event);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->messageId !== null) {
+      $xfer += $output->writeFieldBegin('messageId', TType::STRING, 2);
+      $xfer += $output->writeString($this->messageId);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->messageType !== null) {
+      $xfer += $output->writeFieldBegin('messageType', TType::I32, 3);
+      $xfer += $output->writeI32($this->messageType);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->updatedTime !== null) {
+      $xfer += $output->writeFieldBegin('updatedTime', TType::I64, 4);
+      $xfer += $output->writeI64($this->updatedTime);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->messageLevel !== null) {
+      $xfer += $output->writeFieldBegin('messageLevel', TType::I32, 5);
+      $xfer += $output->writeI32($this->messageLevel);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
+$GLOBALS['messagingEvents_CONSTANTS']['DEFAULT_ID'] = "DO_NOT_SET_AT_CLIENTS";
+
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index 38c25a4..8eaddcb 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -24,6 +24,22 @@ namespace java org.apache.airavata.model.messaging.event
 namespace php Airavata.Model.Messaging.Event
 namespace cpp apache.airavata.model.messaging.event
 
+const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
+
+enum MessageLevel {
+    INFO,
+    DEBUG,
+    ERROR,
+    ACK
+}
+
+enum MessageType {
+    EXPERIMENT,
+    TASK,
+    WORKFLOWNODE,
+    JOB
+}
+
 struct ExperimentStatusChangeEvent {
     1: required experimentModel.ExperimentState state;
     2: required string experimentId;
@@ -80,6 +96,14 @@ struct JobStatusChangeEvent {
 //    3: required JobMonitor jobMonitor;
 }
 
+struct Message {
+    1: required binary event;
+    2: required string messageId = DEFAULT_ID;
+    3: required MessageType messageType;
+    4: optional i64 updatedTime;
+    5: optional MessageLevel messageLevel;
+}
+
 
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
index 6c29313..99b600f 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java
@@ -23,6 +23,7 @@ package org.apache.airavata.common.utils;
 
 import java.sql.Timestamp;
 import java.util.Calendar;
+import java.util.UUID;
 
 public class AiravataUtils {
 	public static final String EXECUTION_MODE="application.execution.mode";
@@ -66,4 +67,9 @@ public class AiravataUtils {
         }
         return new Timestamp(time);
     }
+
+    public static String getId (String name){
+        String id = name.replaceAll("\\s", "");
+        return id + "_" + UUID.randomUUID();
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index d142ceb..5f9e36f 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -20,12 +20,15 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Calendar;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
@@ -67,7 +70,16 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
                 updateJobStatus(taskID, jobID, state);
     			logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
             	monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
-                publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
+                JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
+                Message message = new Message();
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                ObjectOutputStream oos = new ObjectOutputStream(baos);
+                oos.writeObject(changeEvent);
+                message.setEvent(baos.toByteArray());
+                message.setMessageType(MessageType.JOB);
+                message.setMessageLevel(MessageLevel.INFO);
+                message.setMessageId(AiravataUtils.getId("JOB"));
+                publisher.publish(message);
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index f4e6241..dd91c61 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -20,8 +20,11 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Calendar;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.Publisher;
@@ -93,7 +96,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
                                                          jobStatus.getJobIdentity().getWorkflowNodeId(),
                                                          jobStatus.getJobIdentity().getExperimentId());
             monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
-            publisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
+            TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity);
+            Message message = new Message();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(changeEvent);
+            message.setEvent(baos.toByteArray());
+            message.setMessageType(MessageType.TASK);
+            message.setMessageLevel(MessageLevel.INFO);
+            message.setMessageId(AiravataUtils.getId("TASK"));
+            publisher.publish(message);
 
         } catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index fe24bd0..5b9a5ed 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -20,14 +20,15 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Calendar;
 
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowIdentity;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -80,7 +81,16 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
 			logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
             WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
             monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
-            publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
+            WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+            Message message = new Message();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(changeEvent);
+            message.setEvent(baos.toByteArray());
+            message.setMessageType(MessageType.WORKFLOWNODE);
+            message.setMessageLevel(MessageLevel.INFO);
+            message.setMessageId(AiravataUtils.getId("NODE"));
+            publisher.publish(message);
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
 		}

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index 24cfe59..24c8e2a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -1,13 +1,7 @@
 package org.apache.airavata.messaging.core;
 
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
 
 public interface Publisher {
-    public void publish(ExperimentStatusChangeEvent event);
-    public void publish(WorkflowNodeStatusChangeEvent event);
-    public void publish(TaskStatusChangeEvent event);
-    public void publish(JobStatusChangeEvent event);
+    public void publish(Message message);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
index eaa6158..0fda442 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
@@ -22,10 +22,7 @@
 package org.apache.airavata.messaging.core.impl;
 
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.*;
 
 public class AiravataRabbitMQPublisher implements Publisher {
     private String brokerUrl;
@@ -39,19 +36,7 @@ public class AiravataRabbitMQPublisher implements Publisher {
         RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
     }
 
-    public void publish(ExperimentStatusChangeEvent event) {
-
-    }
-
-    public void publish(WorkflowNodeStatusChangeEvent event) {
-
-    }
-
-    public void publish(TaskStatusChangeEvent event) {
-
-    }
-
-    public void publish(JobStatusChangeEvent event) {
+    public void publish(Message message) {
 
     }
 }