You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2015/01/22 15:35:31 UTC
airavata git commit: adding RabbitMQDatacat Support Beans,
to be tested
Repository: airavata
Updated Branches:
refs/heads/master 32fff9444 -> 714c2048d
adding RabbitMQDatacat Support Beans, to be tested
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/714c2048
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/714c2048
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/714c2048
Branch: refs/heads/master
Commit: 714c2048da87e70544ebdeabab739f8158b134ce
Parents: 32fff94
Author: Sachith Withana <sw...@Sachiths-MacBook-Pro.local>
Authored: Thu Jan 22 20:10:20 2015 +0530
Committer: Sachith Withana <sw...@Sachiths-MacBook-Pro.local>
Committed: Thu Jan 22 20:10:20 2015 +0530
----------------------------------------------------------------------
.../event/ExperimentOutputCreatedEvent.java | 573 +++++++++++++++++++
.../event/ExperimentOutputParsedEvent.java | 573 +++++++++++++++++++
.../model/messaging/event/MessageType.java | 27 +-
.../messagingEvents.thrift | 15 +-
.../airavata/common/utils/ServerSettings.java | 5 +
.../main/resources/airavata-server.properties | 5 +
.../gfac/core/handler/AbstractHandler.java | 13 +-
.../gfac/ssh/handler/SSHOutputHandler.java | 21 +
.../impl/BetterRabbitMQDatacatConsumer.java | 100 ++++
.../core/impl/RabbitMQDatacatConsumer.java | 238 ++++++++
.../core/impl/RabbitMQDatacatPublisher.java | 85 +++
.../core/RabbitMQDatacatConsumerTest.java | 37 ++
.../core/RabbitMQDatacatPublisherTest.java | 48 ++
13 files changed, 1719 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
new file mode 100644
index 0000000..6c3c6a6
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputCreatedEvent.java
@@ -0,0 +1,573 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentOutputCreatedEvent implements org.apache.thrift.TBase<ExperimentOutputCreatedEvent, ExperimentOutputCreatedEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentOutputCreatedEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentOutputCreatedEvent");
+
+ private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField FILENAME_FIELD_DESC = new org.apache.thrift.protocol.TField("filename", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField FILEPATH_FIELD_DESC = new org.apache.thrift.protocol.TField("filepath", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ExperimentOutputCreatedEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ExperimentOutputCreatedEventTupleSchemeFactory());
+ }
+
+ public String experimentId; // required
+ public String filename; // required
+ public String filepath; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXPERIMENT_ID((short)1, "experimentId"),
+ FILENAME((short)2, "filename"),
+ FILEPATH((short)3, "filepath");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EXPERIMENT_ID
+ return EXPERIMENT_ID;
+ case 2: // FILENAME
+ return FILENAME;
+ case 3: // FILEPATH
+ return FILEPATH;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.FILENAME, new org.apache.thrift.meta_data.FieldMetaData("filename", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.FILEPATH, new org.apache.thrift.meta_data.FieldMetaData("filepath", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentOutputCreatedEvent.class, metaDataMap);
+ }
+
+ public ExperimentOutputCreatedEvent() {
+ }
+
+ public ExperimentOutputCreatedEvent(
+ String experimentId,
+ String filename,
+ String filepath)
+ {
+ this();
+ this.experimentId = experimentId;
+ this.filename = filename;
+ this.filepath = filepath;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ExperimentOutputCreatedEvent(ExperimentOutputCreatedEvent other) {
+ if (other.isSetExperimentId()) {
+ this.experimentId = other.experimentId;
+ }
+ if (other.isSetFilename()) {
+ this.filename = other.filename;
+ }
+ if (other.isSetFilepath()) {
+ this.filepath = other.filepath;
+ }
+ }
+
+ public ExperimentOutputCreatedEvent deepCopy() {
+ return new ExperimentOutputCreatedEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.experimentId = null;
+ this.filename = null;
+ this.filepath = null;
+ }
+
+ public String getExperimentId() {
+ return this.experimentId;
+ }
+
+ public ExperimentOutputCreatedEvent setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ return this;
+ }
+
+ public void unsetExperimentId() {
+ this.experimentId = null;
+ }
+
+ /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+ public boolean isSetExperimentId() {
+ return this.experimentId != null;
+ }
+
+ public void setExperimentIdIsSet(boolean value) {
+ if (!value) {
+ this.experimentId = null;
+ }
+ }
+
+ public String getFilename() {
+ return this.filename;
+ }
+
+ public ExperimentOutputCreatedEvent setFilename(String filename) {
+ this.filename = filename;
+ return this;
+ }
+
+ public void unsetFilename() {
+ this.filename = null;
+ }
+
+ /** Returns true if field filename is set (has been assigned a value) and false otherwise */
+ public boolean isSetFilename() {
+ return this.filename != null;
+ }
+
+ public void setFilenameIsSet(boolean value) {
+ if (!value) {
+ this.filename = null;
+ }
+ }
+
+ public String getFilepath() {
+ return this.filepath;
+ }
+
+ public ExperimentOutputCreatedEvent setFilepath(String filepath) {
+ this.filepath = filepath;
+ return this;
+ }
+
+ public void unsetFilepath() {
+ this.filepath = null;
+ }
+
+ /** Returns true if field filepath is set (has been assigned a value) and false otherwise */
+ public boolean isSetFilepath() {
+ return this.filepath != null;
+ }
+
+ public void setFilepathIsSet(boolean value) {
+ if (!value) {
+ this.filepath = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ if (value == null) {
+ unsetExperimentId();
+ } else {
+ setExperimentId((String)value);
+ }
+ break;
+
+ case FILENAME:
+ if (value == null) {
+ unsetFilename();
+ } else {
+ setFilename((String)value);
+ }
+ break;
+
+ case FILEPATH:
+ if (value == null) {
+ unsetFilepath();
+ } else {
+ setFilepath((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ return getExperimentId();
+
+ case FILENAME:
+ return getFilename();
+
+ case FILEPATH:
+ return getFilepath();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXPERIMENT_ID:
+ return isSetExperimentId();
+ case FILENAME:
+ return isSetFilename();
+ case FILEPATH:
+ return isSetFilepath();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ExperimentOutputCreatedEvent)
+ return this.equals((ExperimentOutputCreatedEvent)that);
+ return false;
+ }
+
+ public boolean equals(ExperimentOutputCreatedEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_experimentId = true && this.isSetExperimentId();
+ boolean that_present_experimentId = true && that.isSetExperimentId();
+ if (this_present_experimentId || that_present_experimentId) {
+ if (!(this_present_experimentId && that_present_experimentId))
+ return false;
+ if (!this.experimentId.equals(that.experimentId))
+ return false;
+ }
+
+ boolean this_present_filename = true && this.isSetFilename();
+ boolean that_present_filename = true && that.isSetFilename();
+ if (this_present_filename || that_present_filename) {
+ if (!(this_present_filename && that_present_filename))
+ return false;
+ if (!this.filename.equals(that.filename))
+ return false;
+ }
+
+ boolean this_present_filepath = true && this.isSetFilepath();
+ boolean that_present_filepath = true && that.isSetFilepath();
+ if (this_present_filepath || that_present_filepath) {
+ if (!(this_present_filepath && that_present_filepath))
+ return false;
+ if (!this.filepath.equals(that.filepath))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(ExperimentOutputCreatedEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExperimentId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetFilename()).compareTo(other.isSetFilename());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFilename()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filename, other.filename);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetFilepath()).compareTo(other.isSetFilepath());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFilepath()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filepath, other.filepath);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ExperimentOutputCreatedEvent(");
+ boolean first = true;
+
+ sb.append("experimentId:");
+ if (this.experimentId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.experimentId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("filename:");
+ if (this.filename == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.filename);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("filepath:");
+ if (this.filepath == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.filepath);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (experimentId == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' was not present! Struct: " + toString());
+ }
+ if (filename == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'filename' was not present! Struct: " + toString());
+ }
+ if (filepath == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'filepath' was not present! Struct: " + toString());
+ }
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ExperimentOutputCreatedEventStandardSchemeFactory implements SchemeFactory {
+ public ExperimentOutputCreatedEventStandardScheme getScheme() {
+ return new ExperimentOutputCreatedEventStandardScheme();
+ }
+ }
+
+ private static class ExperimentOutputCreatedEventStandardScheme extends StandardScheme<ExperimentOutputCreatedEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EXPERIMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // FILENAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.filename = iprot.readString();
+ struct.setFilenameIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // FILEPATH
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.filepath = iprot.readString();
+ struct.setFilepathIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.experimentId != null) {
+ oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.experimentId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.filename != null) {
+ oprot.writeFieldBegin(FILENAME_FIELD_DESC);
+ oprot.writeString(struct.filename);
+ oprot.writeFieldEnd();
+ }
+ if (struct.filepath != null) {
+ oprot.writeFieldBegin(FILEPATH_FIELD_DESC);
+ oprot.writeString(struct.filepath);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ExperimentOutputCreatedEventTupleSchemeFactory implements SchemeFactory {
+ public ExperimentOutputCreatedEventTupleScheme getScheme() {
+ return new ExperimentOutputCreatedEventTupleScheme();
+ }
+ }
+
+ private static class ExperimentOutputCreatedEventTupleScheme extends TupleScheme<ExperimentOutputCreatedEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.experimentId);
+ oprot.writeString(struct.filename);
+ oprot.writeString(struct.filepath);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputCreatedEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ struct.filename = iprot.readString();
+ struct.setFilenameIsSet(true);
+ struct.filepath = iprot.readString();
+ struct.setFilepathIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
new file mode 100644
index 0000000..72310f3
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentOutputParsedEvent.java
@@ -0,0 +1,573 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExperimentOutputParsedEvent implements org.apache.thrift.TBase<ExperimentOutputParsedEvent, ExperimentOutputParsedEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentOutputParsedEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentOutputParsedEvent");
+
+ private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField DOCUMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("documentID", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ExperimentOutputParsedEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ExperimentOutputParsedEventTupleSchemeFactory());
+ }
+
+ public String experimentId; // required
+ public String documentID; // required
+ public String status; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ EXPERIMENT_ID((short)1, "experimentId"),
+ DOCUMENT_ID((short)2, "documentID"),
+ STATUS((short)3, "status");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // EXPERIMENT_ID
+ return EXPERIMENT_ID;
+ case 2: // DOCUMENT_ID
+ return DOCUMENT_ID;
+ case 3: // STATUS
+ return STATUS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.DOCUMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("documentID", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.STATUS, new org.apache.thrift.meta_data.FieldMetaData("status", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentOutputParsedEvent.class, metaDataMap);
+ }
+
+ public ExperimentOutputParsedEvent() {
+ }
+
+ public ExperimentOutputParsedEvent(
+ String experimentId,
+ String documentID,
+ String status)
+ {
+ this();
+ this.experimentId = experimentId;
+ this.documentID = documentID;
+ this.status = status;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ExperimentOutputParsedEvent(ExperimentOutputParsedEvent other) {
+ if (other.isSetExperimentId()) {
+ this.experimentId = other.experimentId;
+ }
+ if (other.isSetDocumentID()) {
+ this.documentID = other.documentID;
+ }
+ if (other.isSetStatus()) {
+ this.status = other.status;
+ }
+ }
+
+ public ExperimentOutputParsedEvent deepCopy() {
+ return new ExperimentOutputParsedEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.experimentId = null;
+ this.documentID = null;
+ this.status = null;
+ }
+
+ public String getExperimentId() {
+ return this.experimentId;
+ }
+
+ public ExperimentOutputParsedEvent setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ return this;
+ }
+
+ public void unsetExperimentId() {
+ this.experimentId = null;
+ }
+
+ /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */
+ public boolean isSetExperimentId() {
+ return this.experimentId != null;
+ }
+
+ public void setExperimentIdIsSet(boolean value) {
+ if (!value) {
+ this.experimentId = null;
+ }
+ }
+
+ public String getDocumentID() {
+ return this.documentID;
+ }
+
+ public ExperimentOutputParsedEvent setDocumentID(String documentID) {
+ this.documentID = documentID;
+ return this;
+ }
+
+ public void unsetDocumentID() {
+ this.documentID = null;
+ }
+
+ /** Returns true if field documentID is set (has been assigned a value) and false otherwise */
+ public boolean isSetDocumentID() {
+ return this.documentID != null;
+ }
+
+ public void setDocumentIDIsSet(boolean value) {
+ if (!value) {
+ this.documentID = null;
+ }
+ }
+
+ public String getStatus() {
+ return this.status;
+ }
+
+ public ExperimentOutputParsedEvent setStatus(String status) {
+ this.status = status;
+ return this;
+ }
+
+ public void unsetStatus() {
+ this.status = null;
+ }
+
+ /** Returns true if field status is set (has been assigned a value) and false otherwise */
+ public boolean isSetStatus() {
+ return this.status != null;
+ }
+
+ public void setStatusIsSet(boolean value) {
+ if (!value) {
+ this.status = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ if (value == null) {
+ unsetExperimentId();
+ } else {
+ setExperimentId((String)value);
+ }
+ break;
+
+ case DOCUMENT_ID:
+ if (value == null) {
+ unsetDocumentID();
+ } else {
+ setDocumentID((String)value);
+ }
+ break;
+
+ case STATUS:
+ if (value == null) {
+ unsetStatus();
+ } else {
+ setStatus((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case EXPERIMENT_ID:
+ return getExperimentId();
+
+ case DOCUMENT_ID:
+ return getDocumentID();
+
+ case STATUS:
+ return getStatus();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case EXPERIMENT_ID:
+ return isSetExperimentId();
+ case DOCUMENT_ID:
+ return isSetDocumentID();
+ case STATUS:
+ return isSetStatus();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ExperimentOutputParsedEvent)
+ return this.equals((ExperimentOutputParsedEvent)that);
+ return false;
+ }
+
+ public boolean equals(ExperimentOutputParsedEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_experimentId = true && this.isSetExperimentId();
+ boolean that_present_experimentId = true && that.isSetExperimentId();
+ if (this_present_experimentId || that_present_experimentId) {
+ if (!(this_present_experimentId && that_present_experimentId))
+ return false;
+ if (!this.experimentId.equals(that.experimentId))
+ return false;
+ }
+
+ boolean this_present_documentID = true && this.isSetDocumentID();
+ boolean that_present_documentID = true && that.isSetDocumentID();
+ if (this_present_documentID || that_present_documentID) {
+ if (!(this_present_documentID && that_present_documentID))
+ return false;
+ if (!this.documentID.equals(that.documentID))
+ return false;
+ }
+
+ boolean this_present_status = true && this.isSetStatus();
+ boolean that_present_status = true && that.isSetStatus();
+ if (this_present_status || that_present_status) {
+ if (!(this_present_status && that_present_status))
+ return false;
+ if (!this.status.equals(that.status))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(ExperimentOutputParsedEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExperimentId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetDocumentID()).compareTo(other.isSetDocumentID());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetDocumentID()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.documentID, other.documentID);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetStatus()).compareTo(other.isSetStatus());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetStatus()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.status, other.status);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ExperimentOutputParsedEvent(");
+ boolean first = true;
+
+ sb.append("experimentId:");
+ if (this.experimentId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.experimentId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("documentID:");
+ if (this.documentID == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.documentID);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("status:");
+ if (this.status == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.status);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (experimentId == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' was not present! Struct: " + toString());
+ }
+ if (documentID == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'documentID' was not present! Struct: " + toString());
+ }
+ if (status == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'status' was not present! Struct: " + toString());
+ }
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ExperimentOutputParsedEventStandardSchemeFactory implements SchemeFactory {
+ public ExperimentOutputParsedEventStandardScheme getScheme() {
+ return new ExperimentOutputParsedEventStandardScheme();
+ }
+ }
+
+ private static class ExperimentOutputParsedEventStandardScheme extends StandardScheme<ExperimentOutputParsedEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // EXPERIMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // DOCUMENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.documentID = iprot.readString();
+ struct.setDocumentIDIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // STATUS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.status = iprot.readString();
+ struct.setStatusIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.experimentId != null) {
+ oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.experimentId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.documentID != null) {
+ oprot.writeFieldBegin(DOCUMENT_ID_FIELD_DESC);
+ oprot.writeString(struct.documentID);
+ oprot.writeFieldEnd();
+ }
+ if (struct.status != null) {
+ oprot.writeFieldBegin(STATUS_FIELD_DESC);
+ oprot.writeString(struct.status);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ExperimentOutputParsedEventTupleSchemeFactory implements SchemeFactory {
+ public ExperimentOutputParsedEventTupleScheme getScheme() {
+ return new ExperimentOutputParsedEventTupleScheme();
+ }
+ }
+
+ private static class ExperimentOutputParsedEventTupleScheme extends TupleScheme<ExperimentOutputParsedEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.experimentId);
+ oprot.writeString(struct.documentID);
+ oprot.writeString(struct.status);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentOutputParsedEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.experimentId = iprot.readString();
+ struct.setExperimentIdIsSet(true);
+ struct.documentID = iprot.readString();
+ struct.setDocumentIDIsSet(true);
+ struct.status = iprot.readString();
+ struct.setStatusIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index d00f404..0295101 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -1,21 +1,4 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
* Autogenerated by Thrift Compiler (0.9.1)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
@@ -28,11 +11,13 @@ import java.util.Map;
import java.util.HashMap;
import org.apache.thrift.TEnum;
-@SuppressWarnings("all") public enum MessageType implements org.apache.thrift.TEnum {
+public enum MessageType implements org.apache.thrift.TEnum {
EXPERIMENT(0),
TASK(1),
WORKFLOWNODE(2),
- JOB(3);
+ JOB(3),
+ EXPERIMENT_OUTPUT(4),
+ OUTPUT_PARSED(5);
private final int value;
@@ -61,6 +46,10 @@ import org.apache.thrift.TEnum;
return WORKFLOWNODE;
case 3:
return JOB;
+ case 4:
+ return EXPERIMENT_OUTPUT;
+ case 5:
+ return OUTPUT_PARSED;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index c9f3808..670c5fe 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -38,7 +38,9 @@ enum MessageType {
EXPERIMENT,
TASK,
WORKFLOWNODE,
- JOB
+ JOB,
+ EXPERIMENT_OUTPUT,
+ OUTPUT_PARSED
}
struct ExperimentStatusChangeEvent {
@@ -118,7 +120,18 @@ struct Message {
5: optional MessageLevel messageLevel;
}
+struct ExperimentOutputCreatedEvent {
+ 1: required string experimentId;
+ 2: required string filename;
+ 3: required string filepath;
+}
+
+struct ExperimentOutputParsedEvent {
+ 1: required string experimentId;
+ 2: required string documentID;
+ 3: required string status;
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 39261e2..e24fdaa 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -55,6 +55,8 @@ public class ServerSettings extends ApplicationSettings {
private static final String ACTIVITY_LISTENERS = "activity.listeners";
public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
+ public static final String DATACAT_EXCHANGE = "datacat.exchange";
+
private static boolean stopAllThreads = false;
public static String getDefaultUser() throws ApplicationSettingsException {
@@ -64,6 +66,9 @@ public class ServerSettings extends ApplicationSettings {
public static String getDefaultUserPassword() throws ApplicationSettingsException {
return getSetting(DEFAULT_USER_PASSWORD);
}
+ public static String getDatacatExchange() throws ApplicationSettingsException {
+ return getSetting(DATACAT_EXCHANGE);
+ }
public static String getDefaultUserGateway() throws ApplicationSettingsException {
return getSetting(DEFAULT_USER_GATEWAY);
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 22d0a65..9f7235d 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -240,3 +240,8 @@ gfac-experiments=/gfac-experiments
gfac-server-name=gfac-node0
orchestrator-server-name=orch-node0
airavata-server-name=api-node0
+
+###########################################################################
+# datacat module Configuration
+###########################################################################
+datacat.exchange=datacat
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index f4f5d7c..da8fb64 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -23,17 +23,28 @@ package org.apache.airavata.gfac.core.handler;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractHandler implements GFacHandler {
- protected Registry registry = null;
+ private static final Logger log = LoggerFactory.getLogger(AbstractHandler.class);
+
+ protected Registry registry = null;
protected MonitorPublisher publisher = null;
+ protected RabbitMQDatacatPublisher datacatPublisher;
protected AbstractHandler() {
publisher = BetterGfacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml
+ try {
+ datacatPublisher = new RabbitMQDatacatPublisher();
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
}
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
index b574540..0524703 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.airavata.gfac.ssh.handler;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.AbstractHandler;
@@ -30,8 +32,11 @@ import org.apache.airavata.gfac.core.utils.OutputUtils;
import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
import org.apache.airavata.gfac.ssh.util.GFACSSHUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.DataTransferDetails;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
@@ -222,6 +227,22 @@ public class SSHOutputHandler extends AbstractHandler {
registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID());
+ //Sending the message to the Datacat server
+ if (ServerSettings.isRabbitMqPublishEnabled()) {
+ String gatewayId = ServerSettings.getDefaultUserGateway();
+ for (String outputFileName : jobExecutionContext.getOutputFiles()) {
+ String outputPath = jobExecutionContext.getOutputDir();
+ ExperimentOutputCreatedEvent event = new ExperimentOutputCreatedEvent(
+ jobExecutionContext.getExperimentID(),
+ outputFileName, outputPath + File.separatorChar + outputFileName);
+ String messageId = AiravataUtils.getId("EXPERIMENT");
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_OUTPUT
+ , messageId, gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ datacatPublisher.publish(messageContext);
+ }
+ }
+
} catch (Exception e) {
try {
status.setTransferState(TransferState.FAILED);
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
new file mode 100644
index 0000000..1380ebd
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/BetterRabbitMQDatacatConsumer.java
@@ -0,0 +1,100 @@
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.model.messaging.event.ExperimentOutputParsedEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.log4j.LogManager;
+import org.apache.thrift.TBase;
+
+public class BetterRabbitMQDatacatConsumer {
+ private final org.apache.log4j.Logger logger = LogManager.getLogger(BetterRabbitMQDatacatConsumer.class);
+
+ private String BINDING_KEY;
+ private String RABBITMQ_HOST;
+ private String EXCHANGE_NAME;
+ private boolean runFileUpdateListener = false;
+
+ public BetterRabbitMQDatacatConsumer() {
+ RABBITMQ_HOST = "localhost";
+ EXCHANGE_NAME = "datacat";
+ runFileUpdateListener = true;
+ }
+
+ public void startBroker() {
+ (new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(RABBITMQ_HOST);
+
+ Connection connection = factory.newConnection();
+ Channel channel = connection.createChannel();
+
+ channel.exchangeDeclare(EXCHANGE_NAME, "topic");
+ String queueName = channel.queueDeclare().getQueue();
+
+ channel.basicQos(1);
+ channel.queueBind(queueName, EXCHANGE_NAME, "*");
+
+ logger.debug("Waiting for messages. To exit press CTRL+C");
+
+ QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queueName, true, consumer);
+
+ while (runFileUpdateListener) {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+
+ Message message = new Message();
+ ThriftUtils.createThriftFromBytes(delivery.getBody(), message);
+ TBase event = null;
+
+ if (message.getMessageType().equals(MessageType.OUTPUT_PARSED)) {
+
+ ExperimentOutputParsedEvent experimentOutputParsedEvent = new ExperimentOutputParsedEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), experimentOutputParsedEvent);
+
+ logger.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' with filename " +
+ experimentOutputParsedEvent.getDocumentID());
+
+ event = experimentOutputParsedEvent;
+
+ logger.debug(" [x] Received FileInfo Message'");
+ process(experimentOutputParsedEvent, message.getUpdatedTime());
+ logger.debug(" [x] Done Processing FileInfo Message");
+ } else {
+ logger.debug("Recieved message of type ..." +message.getMessageType());
+ }
+ //FIXME Debug the basicAck
+ //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ }
+ } catch (Exception e) {
+ logger.error(e);
+ }
+ }
+
+
+ })).start();
+
+
+ }
+
+ private void process(ExperimentOutputParsedEvent experimentOutputParsedEvent, long updatedTime) {
+ logger.info("Processing the event!!!");
+ logger.info(experimentOutputParsedEvent.getExperimentId()+" ----- "+ experimentOutputParsedEvent.getDocumentID());
+ }
+
+ public void stopBroker() {
+ runFileUpdateListener = false;
+ logger.info("Shutting down FileUpdateListener...");
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
new file mode 100644
index 0000000..d00286a
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatConsumer.java
@@ -0,0 +1,238 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQDatacatConsumer implements Consumer {
+ private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
+
+ private String exchangeName;
+ private String url;
+ private Connection connection;
+ private Channel channel;
+ private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+ public RabbitMQDatacatConsumer() throws AiravataException {
+ try {
+ url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getDatacatExchange();
+
+ url = "amqp://localhost:5672";
+ exchangeName = "datacat";
+
+ createConnection();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ }
+
+ public RabbitMQDatacatConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+ this.exchangeName = exchangeName;
+ this.url = brokerUrl;
+
+ createConnection();
+ }
+
+ private void createConnection() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(url);
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchangeName, "topic", false);
+
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + exchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public String listen(final MessageHandler handler) throws AiravataException {
+ try {
+ Map<String, Object> props = handler.getProperties();
+ final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+ if (routing == null) {
+ throw new IllegalArgumentException("The routing key must be present");
+ }
+
+ List<String> keys = new ArrayList<String>();
+ if (routing instanceof List) {
+ for (Object o : (List)routing) {
+ keys.add(o.toString());
+ }
+ } else if (routing instanceof String) {
+ keys.add((String) routing);
+ }
+
+ String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+ String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+ if (queueName == null) {
+ if (!channel.isOpen()) {
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchangeName, "topic", false);
+ }
+ queueName = channel.queueDeclare().getQueue();
+ } else {
+ channel.queueDeclare(queueName, true, false, false, null);
+ }
+
+ final String id = getId(keys, queueName);
+ if (queueDetailsMap.containsKey(id)) {
+ throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+ "cannot define the same subscriber twice");
+ }
+
+ if (consumerTag == null) {
+ consumerTag = "default";
+ }
+
+ // bind all the routing keys
+ for (String routingKey : keys) {
+ channel.queueBind(queueName, exchangeName, routingKey);
+ }
+
+ channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ TBase event = null;
+ String gatewayId = null;
+ if (message.getMessageType().equals(MessageType.OUTPUT_PARSED)) {
+ ExperimentOutputParsedEvent experimentOutputParsedEvent = new ExperimentOutputParsedEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), experimentOutputParsedEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType());
+ event = experimentOutputParsedEvent;
+ //FIXME : transfer the gatewayID
+ gatewayId = "temp";
+ }
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ handler.onMessage(messageContext);
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+ log.warn(msg, e);
+ }
+ }
+ });
+ // save the name for deleting the queue
+ queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+ return id;
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + exchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void stopListen(final String id) throws AiravataException {
+ QueueDetails details = queueDetailsMap.get(id);
+ if (details != null) {
+ try {
+ for (String key : details.getRoutingKeys()) {
+ channel.queueUnbind(details.getQueueName(), exchangeName, key);
+ }
+ channel.queueDelete(details.getQueueName(), true, true);
+ } catch (IOException e) {
+ String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+ }
+
+ /**
+ * Private class for holding some information about the consumers registered
+ */
+ private class QueueDetails {
+ String queueName;
+
+ List<String> routingKeys;
+
+ private QueueDetails(String queueName, List<String> routingKeys) {
+ this.queueName = queueName;
+ this.routingKeys = routingKeys;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public List<String> getRoutingKeys() {
+ return routingKeys;
+ }
+ }
+
+ private String getId(List<String> routingKeys, String queueName) {
+ String id = "";
+ for (String key : routingKeys) {
+ id = id + "_" + key;
+ }
+ return id + "_" + queueName;
+ }
+
+ public void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
new file mode 100644
index 0000000..96a2650
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQDatacatPublisher.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQDatacatPublisher implements Publisher {
+
+ private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisher.class);
+
+ private RabbitMQProducer rabbitMQProducer;
+
+
+ public RabbitMQDatacatPublisher() throws Exception {
+ String brokerUrl;
+ String exchangeName;
+ try {
+ brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getDatacatExchange();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+ rabbitMQProducer.open();
+ }
+
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ log.info("Publishing status to datacat rabbitmq...");
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+ String routingKey = null;
+ if (msgCtx.getType().equals(MessageType.EXPERIMENT_OUTPUT)) {
+ ExperimentOutputCreatedEvent outputCreatedEvent = (ExperimentOutputCreatedEvent) msgCtx.getEvent();
+ routingKey = outputCreatedEvent.getExperimentId();
+ }
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.send(messageBody, routingKey);
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
new file mode 100644
index 0000000..b6ae372
--- /dev/null
+++ b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatConsumerTest.java
@@ -0,0 +1,37 @@
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.impl.BetterRabbitMQDatacatConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class RabbitMQDatacatConsumerTest {
+ private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisherTest.class);
+
+ private BetterRabbitMQDatacatConsumer rabbitMQDatacatConsumer;
+
+ @Before
+ public void setup() {
+ try {
+ rabbitMQDatacatConsumer = new BetterRabbitMQDatacatConsumer();
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
+ }
+
+ @Test
+ public void testDatacatConsumer() throws AiravataException, InterruptedException {
+ rabbitMQDatacatConsumer.startBroker();
+ Thread.sleep(20000);
+ rabbitMQDatacatConsumer.stopBroker();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/714c2048/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
new file mode 100644
index 0000000..e60b855
--- /dev/null
+++ b/modules/messaging/core/src/test/java/org/apache/airavata/messaging/core/RabbitMQDatacatPublisherTest.java
@@ -0,0 +1,48 @@
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.impl.RabbitMQDatacatPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
+import org.apache.airavata.model.messaging.event.ExperimentOutputCreatedEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class RabbitMQDatacatPublisherTest {
+
+ private static Logger log = LoggerFactory.getLogger(RabbitMQDatacatPublisherTest.class);
+
+ private RabbitMQDatacatPublisher rabbitMQDatacatPublisher;
+
+ @Before
+ public void setup() {
+ try {
+ rabbitMQDatacatPublisher = new RabbitMQDatacatPublisher();
+ } catch (Exception e) {
+ log.error(e.toString());
+ }
+ }
+
+ @Test
+ public void testDatacatPublisher() throws AiravataException {
+ String outputFile = "gauss.out";
+ String outputPath = "/home/swithana";
+ String messageId = "tesaeqwe";
+ String gatewayID = "gridChem";
+ String expID = "230u34jnr0813";
+
+ ExperimentOutputCreatedEvent event = new ExperimentOutputCreatedEvent(expID,
+ outputFile, outputPath+ File.separatorChar+outputFile);
+
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_OUTPUT
+ , messageId, gatewayID);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+ rabbitMQDatacatPublisher.publish(messageContext);
+ }
+}