You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2015/01/22 15:35:31 UTC

airavata git commit: adding RabbitMQDatacat Support Beans, to be tested

Repository: airavata
Updated Branches:
  refs/heads/master 32fff9444 -> 714c2048d


adding RabbitMQDatacat Support Beans, to be tested


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/714c2048
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/714c2048
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/714c2048

Branch: refs/heads/master
Commit: 714c2048da87e70544ebdeabab739f8158b134ce
Parents: 32fff94
Author: Sachith Withana <sw...@Sachiths-MacBook-Pro.local>
Authored: Thu Jan 22 20:10:20 2015 +0530
Committer: Sachith Withana <sw...@Sachiths-MacBook-Pro.local>
Committed: Thu Jan 22 20:10:20 2015 +0530

----------------------------------------------------------------------
 .../event/ExperimentOutputCreatedEvent.java     | 573 +++++++++++++++++++
 .../event/ExperimentOutputParsedEvent.java      | 573 +++++++++++++++++++
 .../model/messaging/event/MessageType.java      |  27 +-
 .../messagingEvents.thrift                      |  15 +-
 .../airavata/common/utils/ServerSettings.java   |   5 +
 .../main/resources/airavata-server.properties   |   5 +
 .../gfac/core/handler/AbstractHandler.java      |  13 +-
 .../gfac/ssh/handler/SSHOutputHandler.java      |  21 +
 .../impl/BetterRabbitMQDatacatConsumer.java     | 100 ++++
 .../core/impl/RabbitMQDatacatConsumer.java      | 238 ++++++++
 .../core/impl/RabbitMQDatacatPublisher.java     |  85 +++
 .../core/RabbitMQDatacatConsumerTest.java       |  37 ++
 .../core/RabbitMQDatacatPublisherTest.java      |  48 ++
 13 files changed, 1719 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
new file mode 100644
index 0000000..6c3c6a6
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
@@ -0,0 +1,573 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentOutputCreatedEvent implements org.apache.thrift.TBase<ExperimentOutputCreatedEvent, ExperimentOutputCreatedEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentOutputCreatedEvent> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentOutputCreatedEvent");
+
+  private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField FILENAME_FIELD_DESC = new org.apache.thrift.protocol.TField("filename", org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField FILEPATH_FIELD_DESC = new org.apache.thrift.protocol.TField("filepath", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new ExperimentOutputCreatedEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new ExperimentOutputCreatedEventTupleSchemeFactory());
+  }
+
+  public String experimentId; // required
+  public String filename; // required
+  public String filepath; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    EXPERIMENT_ID((short)1, "experimentId"),
+    FILENAME((short)2, "filename"),
+    FILEPATH((short)3, "filepath");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // EXPERIMENT_ID
+          return EXPERIMENT_ID;
+        case 2: // FILENAME
+          return FILENAME;
+        case 3: // FILEPATH
+          return FILEPATH;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.FILENAME, new org.apache.thrift.meta_data.FieldMetaData("filename", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.FILEPATH, new org.apache.thrift.meta_data.FieldMetaData("filepath", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentOutputCreatedEvent.class, metaDataMap);
+  }
+
+  public ExperimentOutputCreatedEvent() {
+  }
+
+  public ExperimentOutputCreatedEvent(
+    String experimentId,
+    String filename,
+    String filepath)
+  {
+    this();
+    this.experimentId = experimentId;
+    this.filename = filename;
+    this.filepath = filepath;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ExperimentOutputCreatedEvent(ExperimentOutputCreatedEvent other) {
+    if (other.isSetExperimentId()) {
+      this.experimentId = other.experimentId;
+    }
+    if (other.isSetFilename()) {
+      this.filename = other.filename;
+    }
+    if (other.isSetFilepath()) {
+      this.filepath = other.filepath;
+    }
+  }
+
+  public ExperimentOutputCreatedEvent deepCopy() {
+    return new ExperimentOutputCreatedEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    this.experimentId = null;
+    this.filename = null;
+    this.filepath = null;
+  }
+
+  public String getExperimentId() {
+    return this.experimentId;
+  }
+
+  public ExperimentOutputCreatedEvent setExperimentId(String experimentId) {
+    this.experimentId = experimentId;
+    return this;
+  }
+
+  public void unsetExperimentId() {
+    this.experimentId = null;
+  }
+
+  /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+  public boolean isSetExperimentId() {
+    return this.experimentId != null;
+  }
+
+  public void setExperimentIdIsSet(boolean value) {
+    if (!value) {
+      this.experimentId = null;
+    }
+  }
+
+  public String getFilename() {
+    return this.filename;
+  }
+
+  public ExperimentOutputCreatedEvent setFilename(String filename) {
+    this.filename = filename;
+    return this;
+  }
+
+  public void unsetFilename() {
+    this.filename = null;
+  }
+
+  /** Returns true if field filename is set (has been assigned a value) and false otherwise */
+  public boolean isSetFilename() {
+    return this.filename != null;
+  }
+
+  public void setFilenameIsSet(boolean value) {
+    if (!value) {
+      this.filename = null;
+    }
+  }
+
+  public String getFilepath() {
+    return this.filepath;
+  }
+
+  public ExperimentOutputCreatedEvent setFilepath(String filepath) {
+    this.filepath = filepath;
+    return this;
+  }
+
+  public void unsetFilepath() {
+    this.filepath = null;
+  }
+
+  /** Returns true if field filepath is set (has been assigned a value) and false otherwise */
+  public boolean isSetFilepath() {
+    return this.filepath != null;
+  }
+
+  public void setFilepathIsSet(boolean value) {
+    if (!value) {
+      this.filepath = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case EXPERIMENT_ID:
+      if (value == null) {
+        unsetExperimentId();
+      } else {
+        setExperimentId((String)value);
+      }
+      break;
+
+    case FILENAME:
+      if (value == null) {
+        unsetFilename();
+      } else {
+        setFilename((String)value);
+      }
+      break;
+
+    case FILEPATH:
+      if (value == null) {
+        unsetFilepath();
+      } else {
+        setFilepath((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case EXPERIMENT_ID:
+      return getExperimentId();
+
+    case FILENAME:
+      return getFilename();
+
+    case FILEPATH:
+      return getFilepath();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case EXPERIMENT_ID:
+      return isSetExperimentId();
+    case FILENAME:
+      return isSetFilename();
+    case FILEPATH:
+      return isSetFilepath();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ExperimentOutputCreatedEvent)
+      return this.equals((ExperimentOutputCreatedEvent)that);
+    return false;
+  }
+
+  public boolean equals(ExperimentOutputCreatedEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_experimentId = true && this.isSetExperimentId();
+    boolean that_present_experimentId = true && that.isSetExperimentId();
+    if (this_present_experimentId || that_present_experimentId) {
+      if (!(this_present_experimentId && that_present_experimentId))
+        return false;
+      if (!this.experimentId.equals(that.experimentId))
+        return false;
+    }
+
+    boolean this_present_filename = true && this.isSetFilename();
+    boolean that_present_filename = true && that.isSetFilename();
+    if (this_present_filename || that_present_filename) {
+      if (!(this_present_filename && that_present_filename))
+        return false;
+      if (!this.filename.equals(that.filename))
+        return false;
+    }
+
+    boolean this_present_filepath = true && this.isSetFilepath();
+    boolean that_present_filepath = true && that.isSetFilepath();
+    if (this_present_filepath || that_present_filepath) {
+      if (!(this_present_filepath && that_present_filepath))
+        return false;
+      if (!this.filepath.equals(that.filepath))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(ExperimentOutputCreatedEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetExperimentId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetFilename()).compareTo(other.isSetFilename());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFilename()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filename, other.filename);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetFilepath()).compareTo(other.isSetFilepath());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFilepath()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filepath, other.filepath);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ExperimentOutputCreatedEvent(");
+    boolean first = true;
+
+    sb.append("experimentId:");
+    if (this.experimentId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.experimentId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("filename:");
+    if (this.filename == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.filename);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("filepath:");
+    if (this.filepath == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.filepath);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (experimentId == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' was not present! Struct: " + toString());
+    }
+    if (filename == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'filename' was not present! Struct: " + toString());
+    }
+    if (filepath == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'filepath' was not present! Struct: " + toString());
+    }
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class ExperimentOutputCreatedEventStandardSchemeFactory implements SchemeFactory {
+    public ExperimentOutputCreatedEventStandardScheme getScheme() {
+      return new ExperimentOutputCreatedEventStandardScheme();
+    }
+  }
+
+  private static class ExperimentOutputCreatedEventStandardScheme extends StandardScheme<ExperimentOutputCreatedEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // EXPERIMENT_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.experimentId = iprot.readString();
+              struct.setExperimentIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // FILENAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.filename = iprot.readString();
+              struct.setFilenameIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // FILEPATH
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.filepath = iprot.readString();
+              struct.setFilepathIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.experimentId != null) {
+        oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+        oprot.writeString(struct.experimentId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.filename != null) {
+        oprot.writeFieldBegin(FILENAME_FIELD_DESC);
+        oprot.writeString(struct.filename);
+        oprot.writeFieldEnd();
+      }
+      if (struct.filepath != null) {
+        oprot.writeFieldBegin(FILEPATH_FIELD_DESC);
+        oprot.writeString(struct.filepath);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ExperimentOutputCreatedEventTupleSchemeFactory implements SchemeFactory {
+    public ExperimentOutputCreatedEventTupleScheme getScheme() {
+      return new ExperimentOutputCreatedEventTupleScheme();
+    }
+  }
+
+  private static class ExperimentOutputCreatedEventTupleScheme extends TupleScheme<ExperimentOutputCreatedEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeString(struct.experimentId);
+      oprot.writeString(struct.filename);
+      oprot.writeString(struct.filepath);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.experimentId = iprot.readString();
+      struct.setExperimentIdIsSet(true);
+      struct.filename = iprot.readString();
+      struct.setFilenameIsSet(true);
+      struct.filepath = iprot.readString();
+      struct.setFilepathIsSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
new file mode 100644
index 0000000..72310f3
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
@@ -0,0 +1,573 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentOutputParsedEvent implements org.apache.thrift.TBase<ExperimentOutputParsedEvent, ExperimentOutputParsedEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentOutputParsedEvent> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentOutputParsedEvent");
+
+  private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField DOCUMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("documentID", org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new ExperimentOutputParsedEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new ExperimentOutputParsedEventTupleSchemeFactory());
+  }
+
+  public String experimentId; // required
+  public String documentID; // required
+  public String status; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    EXPERIMENT_ID((short)1, "experimentId"),
+    DOCUMENT_ID((short)2, "documentID"),
+    STATUS((short)3, "status");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // EXPERIMENT_ID
+          return EXPERIMENT_ID;
+        case 2: // DOCUMENT_ID
+          return DOCUMENT_ID;
+        case 3: // STATUS
+          return STATUS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.DOCUMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("documentID", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentOutputParsedEvent.class, metaDataMap);
+  }
+
+  public ExperimentOutputParsedEvent() {
+  }
+
+  public ExperimentOutputParsedEvent(
+    String experimentId,
+    String documentID,
+    String status)
+  {
+    this();
+    this.experimentId = experimentId;
+    this.documentID = documentID;
+    this.status = status;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ExperimentOutputParsedEvent(ExperimentOutputParsedEvent other) {
+    if (other.isSetExperimentId()) {
+      this.experimentId = other.experimentId;
+    }
+    if (other.isSetDocumentID()) {
+      this.documentID = other.documentID;
+    }
+    if (other.isSetStatus()) {
+      this.status = other.status;
+    }
+  }
+
+  public ExperimentOutputParsedEvent deepCopy() {
+    return new ExperimentOutputParsedEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    this.experimentId = null;
+    this.documentID = null;
+    this.status = null;
+  }
+
+  public String getExperimentId() {
+    return this.experimentId;
+  }
+
+  public ExperimentOutputParsedEvent setExperimentId(String experimentId) {
+    this.experimentId = experimentId;
+    return this;
+  }
+
+  public void unsetExperimentId() {
+    this.experimentId = null;
+  }
+
+  /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+  public boolean isSetExperimentId() {
+    return this.experimentId != null;
+  }
+
+  public void setExperimentIdIsSet(boolean value) {
+    if (!value) {
+      this.experimentId = null;
+    }
+  }
+
+  public String getDocumentID() {
+    return this.documentID;
+  }
+
+  public ExperimentOutputParsedEvent setDocumentID(String documentID) {
+    this.documentID = documentID;
+    return this;
+  }
+
+  public void unsetDocumentID() {
+    this.documentID = null;
+  }
+
+  /** Returns true if field documentID is set (has been assigned a value) and false otherwise */
+  public boolean isSetDocumentID() {
+    return this.documentID != null;
+  }
+
+  public void setDocumentIDIsSet(boolean value) {
+    if (!value) {
+      this.documentID = null;
+    }
+  }
+
+  public String getStatus() {
+    return this.status;
+  }
+
+  public ExperimentOutputParsedEvent setStatus(String status) {
+    this.status = status;
+    return this;
+  }
+
+  public void unsetStatus() {
+    this.status = null;
+  }
+
+  /** Returns true if field status is set (has been assigned a value) and false otherwise */
+  public boolean isSetStatus() {
+    return this.status != null;
+  }
+
+  public void setStatusIsSet(boolean value) {
+    if (!value) {
+      this.status = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case EXPERIMENT_ID:
+      if (value == null) {
+        unsetExperimentId();
+      } else {
+        setExperimentId((String)value);
+      }
+      break;
+
+    case DOCUMENT_ID:
+      if (value == null) {
+        unsetDocumentID();
+      } else {
+        setDocumentID((String)value);
+      }
+      break;
+
+    case STATUS:
+      if (value == null) {
+        unsetStatus();
+      } else {
+        setStatus((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case EXPERIMENT_ID:
+      return getExperimentId();
+
+    case DOCUMENT_ID:
+      return getDocumentID();
+
+    case STATUS:
+      return getStatus();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case EXPERIMENT_ID:
+      return isSetExperimentId();
+    case DOCUMENT_ID:
+      return isSetDocumentID();
+    case STATUS:
+      return isSetStatus();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ExperimentOutputParsedEvent)
+      return this.equals((ExperimentOutputParsedEvent)that);
+    return false;
+  }
+
+  public boolean equals(ExperimentOutputParsedEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_experimentId = true && this.isSetExperimentId();
+    boolean that_present_experimentId = true && that.isSetExperimentId();
+    if (this_present_experimentId || that_present_experimentId) {
+      if (!(this_present_experimentId && that_present_experimentId))
+        return false;
+      if (!this.experimentId.equals(that.experimentId))
+        return false;
+    }
+
+    boolean this_present_documentID = true && this.isSetDocumentID();
+    boolean that_present_documentID = true && that.isSetDocumentID();
+    if (this_present_documentID || that_present_documentID) {
+      if (!(this_present_documentID && that_present_documentID))
+        return false;
+      if (!this.documentID.equals(that.documentID))
+        return false;
+    }
+
+    boolean this_present_status = true && this.isSetStatus();
+    boolean that_present_status = true && that.isSetStatus();
+    if (this_present_status || that_present_status) {
+      if (!(this_present_status && that_present_status))
+        return false;
+      if (!this.status.equals(that.status))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(ExperimentOutputParsedEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetExperimentId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetDocumentID()).compareTo(other.isSetDocumentID());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetDocumentID()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.documentID, other.documentID);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetStatus()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.status, other.status);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ExperimentOutputParsedEvent(");
+    boolean first = true;
+
+    sb.append("experimentId:");
+    if (this.experimentId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.experimentId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("documentID:");
+    if (this.documentID == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.documentID);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("status:");
+    if (this.status == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.status);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (experimentId == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' was not present! Struct: " + toString());
+    }
+    if (documentID == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'documentID' was not present! Struct: " + toString());
+    }
+    if (status == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'status' was not present! Struct: " + toString());
+    }
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class ExperimentOutputParsedEventStandardSchemeFactory implements SchemeFactory {
+    public ExperimentOutputParsedEventStandardScheme getScheme() {
+      return new ExperimentOutputParsedEventStandardScheme();
+    }
+  }
+
+  private static class ExperimentOutputParsedEventStandardScheme extends StandardScheme<ExperimentOutputParsedEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // EXPERIMENT_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.experimentId = iprot.readString();
+              struct.setExperimentIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // DOCUMENT_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.documentID = iprot.readString();
+              struct.setDocumentIDIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // STATUS
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.status = iprot.readString();
+              struct.setStatusIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.experimentId != null) {
+        oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+        oprot.writeString(struct.experimentId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.documentID != null) {
+        oprot.writeFieldBegin(DOCUMENT_ID_FIELD_DESC);
+        oprot.writeString(struct.documentID);
+        oprot.writeFieldEnd();
+      }
+      if (struct.status != null) {
+        oprot.writeFieldBegin(STATUS_FIELD_DESC);
+        oprot.writeString(struct.status);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ExperimentOutputParsedEventTupleSchemeFactory implements SchemeFactory {
+    public ExperimentOutputParsedEventTupleScheme getScheme() {
+      return new ExperimentOutputParsedEventTupleScheme();
+    }
+  }
+
+  private static class ExperimentOutputParsedEventTupleScheme extends TupleScheme<ExperimentOutputParsedEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeString(struct.experimentId);
+      oprot.writeString(struct.documentID);
+      oprot.writeString(struct.status);
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.experimentId = iprot.readString();
+      struct.setExperimentIdIsSet(true);
+      struct.documentID = iprot.readString();
+      struct.setDocumentIDIsSet(true);
+      struct.status = iprot.readString();
+      struct.setStatusIsSet(true);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index d00f404..0295101 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -1,21 +1,4 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
  * Autogenerated by Thrift Compiler (0.9.1)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -28,11 +11,13 @@ import java.util.Map;
 import java.util.HashMap;
 import org.apache.thrift.TEnum;
 
-@SuppressWarnings("all") public enum MessageType implements org.apache.thrift.TEnum {
+public enum MessageType implements org.apache.thrift.TEnum {
   EXPERIMENT(0),
   TASK(1),
   WORKFLOWNODE(2),
-  JOB(3);
+  JOB(3),
+  EXPERIMENT_OUTPUT(4),
+  OUTPUT_PARSED(5);
 
   private final int value;
 
@@ -61,6 +46,10 @@ import org.apache.thrift.TEnum;
         return WORKFLOWNODE;
       case 3:
         return JOB;
+      case 4:
+        return EXPERIMENT_OUTPUT;
+      case 5:
+        return OUTPUT_PARSED;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index c9f3808..670c5fe 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -38,7 +38,9 @@ enum MessageType {
     EXPERIMENT,
     TASK,
     WORKFLOWNODE,
-    JOB
+    JOB,
+    EXPERIMENT_OUTPUT,
+    OUTPUT_PARSED
 }
 
 struct ExperimentStatusChangeEvent {
@@ -118,7 +120,18 @@ struct Message {
     5: optional MessageLevel messageLevel;
 }
 
+struct ExperimentOutputCreatedEvent {
+    1: required string experimentId;
+    2: required string filename;
+    3: required string filepath;
 
+}
+
+struct ExperimentOutputParsedEvent {
+    1: required string experimentId;
+    2: required string documentID;
+    3: required string status;
+}
 
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 39261e2..e24fdaa 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -55,6 +55,8 @@ public class ServerSettings extends ApplicationSettings {
     private static final String ACTIVITY_LISTENERS = "activity.listeners";
     public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
 
+    public static final String DATACAT_EXCHANGE = "datacat.exchange";
+
     private static boolean stopAllThreads = false;
 
     public static String getDefaultUser() throws ApplicationSettingsException {
@@ -64,6 +66,9 @@ public class ServerSettings extends ApplicationSettings {
     public static String getDefaultUserPassword() throws ApplicationSettingsException {
         return getSetting(DEFAULT_USER_PASSWORD);
     }
+    public static String getDatacatExchange() throws ApplicationSettingsException {
+        return getSetting(DATACAT_EXCHANGE);
+    }
 
     public static String getDefaultUserGateway() throws ApplicationSettingsException {
         return getSetting(DEFAULT_USER_GATEWAY);

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 22d0a65..9f7235d 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -240,3 +240,8 @@ gfac-experiments=/gfac-experiments
 gfac-server-name=gfac-node0
 orchestrator-server-name=orch-node0
 airavata-server-name=api-node0
+
+###########################################################################
+# datacat module Configuration
+###########################################################################
+datacat.exchange=datacat

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index f4f5d7c..da8fb64 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -23,17 +23,28 @@ package org.apache.airavata.gfac.core.handler;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractHandler implements GFacHandler {
-	protected Registry registry = null;
+    private static final Logger log = LoggerFactory.getLogger(AbstractHandler.class);
+
+    protected Registry registry = null;
 
     protected MonitorPublisher publisher = null;
+    protected RabbitMQDatacatPublisher datacatPublisher;
 
     protected AbstractHandler() {
         publisher = BetterGfacImpl.getMonitorPublisher();   // This will not be null because this will be initialize in GFacIml
+        try {
+            datacatPublisher = new RabbitMQDatacatPublisher();
+        } catch (Exception e) {
+            log.error(e.toString());
+        }
     }
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
index b574540..0524703 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -20,6 +20,8 @@
 */
 package org.apache.airavata.gfac.ssh.handler;
 
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.handler.AbstractHandler;
@@ -30,8 +32,11 @@ import org.apache.airavata.gfac.core.utils.OutputUtils;
 import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
 import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.appcatalog.appinterface.DataType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
@@ -222,6 +227,22 @@ public class SSHOutputHandler extends AbstractHandler {
             registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
             registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
 
+            //Sending the message to the Datacat server
+            if (ServerSettings.isRabbitMqPublishEnabled()) {
+                String gatewayId = ServerSettings.getDefaultUserGateway();
+                for (String outputFileName : jobExecutionContext.getOutputFiles()) {
+                    String outputPath = jobExecutionContext.getOutputDir();
+                    ExperimentOutputCreatedEvent event = new ExperimentOutputCreatedEvent(
+                            jobExecutionContext.getExperimentID(),
+                            outputFileName, outputPath + File.separatorChar + outputFileName);
+                    String messageId = AiravataUtils.getId("EXPERIMENT");
+                    MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_OUTPUT
+                            , messageId, gatewayId);
+                    messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                    datacatPublisher.publish(messageContext);
+                }
+            }
+
         } catch (Exception e) {
             try {
                 status.setTransferState(TransferState.FAILED);

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
new file mode 100644
index 0000000..1380ebd
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
@@ -0,0 +1,100 @@
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.model.messaging.event.ExperimentOutputParsedEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.log4j.LogManager;
+import org.apache.thrift.TBase;
+
+public class BetterRabbitMQDatacatConsumer {
+    private final org.apache.log4j.Logger logger = LogManager.getLogger(BetterRabbitMQDatacatConsumer.class);
+
+    private String BINDING_KEY;
+    private String RABBITMQ_HOST;
+    private String EXCHANGE_NAME;
+    private boolean runFileUpdateListener = false;
+
+    public BetterRabbitMQDatacatConsumer() {
+        RABBITMQ_HOST = "localhost";
+        EXCHANGE_NAME = "datacat";
+        runFileUpdateListener = true;
+    }
+
+    public void startBroker() {
+        (new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ConnectionFactory factory = new ConnectionFactory();
+                    factory.setHost(RABBITMQ_HOST);
+
+                    Connection connection = factory.newConnection();
+                    Channel channel = connection.createChannel();
+
+                    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+                    String queueName = channel.queueDeclare().getQueue();
+
+                    channel.basicQos(1);
+                    channel.queueBind(queueName, EXCHANGE_NAME, "*");
+
+                    logger.debug("Waiting for messages. To exit press CTRL+C");
+
+                    QueueingConsumer consumer = new QueueingConsumer(channel);
+                    channel.basicConsume(queueName, true, consumer);
+
+                    while (runFileUpdateListener) {
+                        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+
+                        Message message = new Message();
+                        ThriftUtils.createThriftFromBytes(delivery.getBody(), message);
+                        TBase event = null;
+
+                        if (message.getMessageType().equals(MessageType.OUTPUT_PARSED)) {
+
+                            ExperimentOutputParsedEvent experimentOutputParsedEvent = new ExperimentOutputParsedEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentOutputParsedEvent);
+
+                            logger.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  with filename " +
+                                    experimentOutputParsedEvent.getDocumentID());
+
+                            event = experimentOutputParsedEvent;
+
+                            logger.debug(" [x] Received FileInfo Message'");
+                            process(experimentOutputParsedEvent, message.getUpdatedTime());
+                            logger.debug(" [x] Done Processing FileInfo Message");
+                        } else {
+                            logger.debug("Recieved message of type ..." +message.getMessageType());
+                        }
+                        //FIXME Debug the basicAck
+                        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+                    }
+                } catch (Exception e) {
+                    logger.error(e);
+                }
+            }
+
+
+        })).start();
+
+
+    }
+
+    private void process(ExperimentOutputParsedEvent experimentOutputParsedEvent, long updatedTime) {
+        logger.info("Processing the event!!!");
+        logger.info(experimentOutputParsedEvent.getExperimentId()+" ----- "+ experimentOutputParsedEvent.getDocumentID());
+    }
+
+    public void stopBroker() {
+        runFileUpdateListener = false;
+        logger.info("Shutting down FileUpdateListener...");
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
new file mode 100644
index 0000000..d00286a
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
@@ -0,0 +1,238 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQDatacatConsumer implements Consumer {
+    private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
+
+    private String exchangeName;
+    private String url;
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+    public RabbitMQDatacatConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getDatacatExchange();
+
+            url = "amqp://localhost:5672";
+            exchangeName = "datacat";
+
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
+    public RabbitMQDatacatConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+        this.exchangeName = exchangeName;
+        this.url = brokerUrl;
+
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+
+            channel = connection.createChannel();
+            channel.exchangeDeclare(exchangeName, "topic", false);
+
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + exchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public String listen(final MessageHandler handler) throws AiravataException {
+        try {
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
+                throw new IllegalArgumentException("The routing key must be present");
+            }
+
+            List<String> keys = new ArrayList<String>();
+            if (routing instanceof List) {
+                for (Object o : (List)routing) {
+                    keys.add(o.toString());
+                }
+            } else if (routing instanceof String) {
+                keys.add((String) routing);
+            }
+
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            if (queueName == null) {
+                if (!channel.isOpen()) {
+                    channel = connection.createChannel();
+                    channel.exchangeDeclare(exchangeName, "topic", false);
+                }
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+
+            final String id = getId(keys, queueName);
+            if (queueDetailsMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+                        "cannot define the same subscriber twice");
+            }
+
+            if (consumerTag == null) {
+                consumerTag = "default";
+            }
+
+            // bind all the routing keys
+            for (String routingKey : keys) {
+                channel.queueBind(queueName, exchangeName, routingKey);
+            }
+
+            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        TBase event = null;
+                        String gatewayId = null;
+                        if (message.getMessageType().equals(MessageType.OUTPUT_PARSED)) {
+                            ExperimentOutputParsedEvent experimentOutputParsedEvent = new ExperimentOutputParsedEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentOutputParsedEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType());
+                            event = experimentOutputParsedEvent;
+                            //FIXME : transfer the gatewayID
+                            gatewayId = "temp";
+                        }
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                        handler.onMessage(messageContext);
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+                        log.warn(msg, e);
+                    }
+                }
+            });
+            // save the name for deleting the queue
+            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+            return id;
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + exchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void stopListen(final String id) throws AiravataException {
+        QueueDetails details = queueDetailsMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
+                }
+                channel.queueDelete(details.getQueueName(), true, true);
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+                log.error(msg);
+                throw new AiravataException(msg, e);
+            }
+        }
+    }
+
+    /**
+     * Private class for holding some information about the consumers registered
+     */
+    private class QueueDetails {
+        String queueName;
+
+        List<String> routingKeys;
+
+        private QueueDetails(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        public List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
new file mode 100644
index 0000000..96a2650
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQDatacatPublisher implements Publisher {
+
+    private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisher.class);
+
+    private RabbitMQProducer rabbitMQProducer;
+
+
+    public RabbitMQDatacatPublisher() throws Exception {
+        String brokerUrl;
+        String exchangeName;
+        try {
+            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getDatacatExchange();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+        rabbitMQProducer.open();
+    }
+
+    public void publish(MessageContext msgCtx) throws AiravataException {
+        try {
+            log.info("Publishing status to datacat rabbitmq...");
+            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            message.setMessageId(msgCtx.getMessageId());
+            message.setMessageType(msgCtx.getType());
+            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+            String routingKey = null;
+            if (msgCtx.getType().equals(MessageType.EXPERIMENT_OUTPUT)) {
+                ExperimentOutputCreatedEvent outputCreatedEvent = (ExperimentOutputCreatedEvent) msgCtx.getEvent();
+                routingKey = outputCreatedEvent.getExperimentId();
+            }
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            rabbitMQProducer.send(messageBody, routingKey);
+        } catch (TException e) {
+            String msg = "Error while deserializing the object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
new file mode 100644
index 0000000..b6ae372
--- /dev/null
+++ b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
@@ -0,0 +1,37 @@
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.impl.BetterRabbitMQDatacatConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class RabbitMQDatacatConsumerTest {
+    private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisherTest.class);
+
+    private BetterRabbitMQDatacatConsumer rabbitMQDatacatConsumer;
+
+    @Before
+    public void setup() {
+        try {
+            rabbitMQDatacatConsumer = new BetterRabbitMQDatacatConsumer();
+        } catch (Exception e) {
+            log.error(e.toString());
+        }
+    }
+
+    @Test
+    public void testDatacatConsumer() throws AiravataException, InterruptedException {
+        rabbitMQDatacatConsumer.startBroker();
+        Thread.sleep(20000);
+        rabbitMQDatacatConsumer.stopBroker();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
new file mode 100644
index 0000000..e60b855
--- /dev/null
+++ b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class RabbitMQDatacatPublisherTest {
+
+    private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisherTest.class);
+
+    private RabbitMQDatacatPublisher rabbitMQDatacatPublisher;
+
+    @Before
+    public void setup() {
+        try {
+            rabbitMQDatacatPublisher = new RabbitMQDatacatPublisher();
+        } catch (Exception e) {
+            log.error(e.toString());
+        }
+    }
+
+    @Test
+    public void testDatacatPublisher() throws AiravataException {
+        String outputFile = "gauss.out";
+        String outputPath = "/home/swithana";
+        String messageId = "tesaeqwe";
+        String gatewayID = "gridChem";
+        String expID = "230u34jnr0813";
+
+        ExperimentOutputCreatedEvent event = new ExperimentOutputCreatedEvent(expID,
+                outputFile, outputPath+ File.separatorChar+outputFile);
+
+        MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_OUTPUT
+                , messageId, gatewayID);
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+        rabbitMQDatacatPublisher.publish(messageContext);
+    }
+}