You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:20 UTC
[21/50] [abbrv] airavata git commit: Fixed AIRAVATA-1591 ,
AIRAVATA-1592 , AIRAVATA-1593
Fixed AIRAVATA-1591 ,AIRAVATA-1592 , AIRAVATA-1593
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d25441a0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d25441a0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d25441a0
Branch: refs/heads/master
Commit: d25441a017f2e443165a22c9c5966a6d9e6aad9e
Parents: 97ff3b7
Author: shamrath <sh...@gmail.com>
Authored: Wed Feb 25 17:12:18 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Wed Feb 25 17:12:18 2015 -0500
----------------------------------------------------------------------
.../lib/airavata/messagingEvents_types.cpp | 88 +++-
.../lib/airavata/messagingEvents_types.h | 47 +-
.../Airavata/Model/Messaging/Event/Types.php | 94 ++++
.../model/messaging/event/MessageType.java | 5 +-
.../messaging/event/ProcessSubmitEvent.java | 492 +++++++++++++++++++
.../messagingEvents.thrift | 8 +-
.../core/monitor/AiravataTaskStatusUpdator.java | 13 +
.../core/impl/RabbitMQProcessConsumer.java | 158 ++++++
.../core/impl/RabbitMQProcessPublisher.java | 84 ++++
.../core/impl/RabbitMQStatusConsumer.java | 10 +-
.../core/impl/RabbitMQStatusPublisher.java | 20 +-
.../server/OrchestratorServerHandler.java | 76 ++-
modules/simple-workflow/pom.xml | 5 +
.../simple/workflow/engine/ProcessContext.java | 62 +++
.../simple/workflow/engine/ProcessPack.java | 62 ---
.../engine/SimpleWorkflowInterpreter.java | 378 +++++++-------
.../workflow/engine/WorkflowFactoryImpl.java | 4 +-
.../engine/parser/AiravataDefaultParser.java | 293 -----------
.../engine/parser/AiravataWorkflowParser.java | 291 +++++++++++
.../parser/AiravataDefaultParserTest.java | 119 -----
.../parser/AiravataWorkflowParserTest.java | 119 +++++
21 files changed, 1724 insertions(+), 704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index 71f45be..92f29c6 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -47,7 +47,8 @@ int _kMessageTypeValues[] = {
MessageType::WORKFLOWNODE,
MessageType::JOB,
MessageType::LAUNCHTASK,
- MessageType::TERMINATETASK
+ MessageType::TERMINATETASK,
+ MessageType::TASKOUTPUT
};
const char* _kMessageTypeNames[] = {
"EXPERIMENT",
@@ -55,9 +56,10 @@ const char* _kMessageTypeNames[] = {
"WORKFLOWNODE",
"JOB",
"LAUNCHTASK",
- "TERMINATETASK"
+ "TERMINATETASK",
+ "TASKOUTPUT"
};
-const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
const char* ExperimentStatusChangeEvent::ascii_fingerprint = "38C252E94E93B69D04EB3A6EE2F9EDFB";
const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x38,0xC2,0x52,0xE9,0x4E,0x93,0xB6,0x9D,0x04,0xEB,0x3A,0x6E,0xE2,0xF9,0xED,0xFB};
@@ -839,6 +841,86 @@ void swap(JobIdentifier &a, JobIdentifier &b) {
swap(a.gatewayId, b.gatewayId);
}
+const char* ProcessSubmitEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972";
+const uint8_t ProcessSubmitEvent::binary_fingerprint[16] = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+uint32_t ProcessSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_taskId = false;
+ bool isset_credentialToken = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->taskId);
+ isset_taskId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->credentialToken);
+ isset_credentialToken = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_taskId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_credentialToken)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t ProcessSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ xfer += oprot->writeStructBegin("ProcessSubmitEvent");
+
+ xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->taskId);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("credentialToken", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->credentialToken);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b) {
+ using ::std::swap;
+ swap(a.taskId, b.taskId);
+ swap(a.credentialToken, b.credentialToken);
+}
+
const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2";
const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index c7e2bb5..aafcda1 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -54,7 +54,8 @@ struct MessageType {
WORKFLOWNODE = 2,
JOB = 3,
LAUNCHTASK = 4,
- TERMINATETASK = 5
+ TERMINATETASK = 5,
+ TASKOUTPUT = 6
};
};
@@ -462,6 +463,50 @@ class JobIdentifier {
void swap(JobIdentifier &a, JobIdentifier &b);
+class ProcessSubmitEvent {
+ public:
+
+ static const char* ascii_fingerprint; // = "07A9615F837F7D0A952B595DD3020972";
+ static const uint8_t binary_fingerprint[16]; // = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72};
+
+ ProcessSubmitEvent() : taskId(), credentialToken() {
+ }
+
+ virtual ~ProcessSubmitEvent() throw() {}
+
+ std::string taskId;
+ std::string credentialToken;
+
+ void __set_taskId(const std::string& val) {
+ taskId = val;
+ }
+
+ void __set_credentialToken(const std::string& val) {
+ credentialToken = val;
+ }
+
+ bool operator == (const ProcessSubmitEvent & rhs) const
+ {
+ if (!(taskId == rhs.taskId))
+ return false;
+ if (!(credentialToken == rhs.credentialToken))
+ return false;
+ return true;
+ }
+ bool operator != (const ProcessSubmitEvent &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const ProcessSubmitEvent & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b);
+
+
class TaskSubmitEvent {
public:
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index b0d7676..4c4ec93 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -37,6 +37,7 @@ final class MessageType {
const JOB = 3;
const LAUNCHTASK = 4;
const TERMINATETASK = 5;
+ const TASKOUTPUT = 6;
static public $__names = array(
0 => 'EXPERIMENT',
1 => 'TASK',
@@ -44,6 +45,7 @@ final class MessageType {
3 => 'JOB',
4 => 'LAUNCHTASK',
5 => 'TERMINATETASK',
+ 6 => 'TASKOUTPUT',
);
}
@@ -971,6 +973,98 @@ class JobIdentifier {
}
+class ProcessSubmitEvent {
+ static $_TSPEC;
+
+ public $taskId = null;
+ public $credentialToken = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'taskId',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'credentialToken',
+ 'type' => TType::STRING,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['taskId'])) {
+ $this->taskId = $vals['taskId'];
+ }
+ if (isset($vals['credentialToken'])) {
+ $this->credentialToken = $vals['credentialToken'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ProcessSubmitEvent';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->taskId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->credentialToken);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ProcessSubmitEvent');
+ if ($this->taskId !== null) {
+ $xfer += $output->writeFieldBegin('taskId', TType::STRING, 1);
+ $xfer += $output->writeString($this->taskId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->credentialToken !== null) {
+ $xfer += $output->writeFieldBegin('credentialToken', TType::STRING, 2);
+ $xfer += $output->writeString($this->credentialToken);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
class TaskSubmitEvent {
static $_TSPEC;
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
index 230b87b..761626f 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java
@@ -34,7 +34,8 @@ import org.apache.thrift.TEnum;
WORKFLOWNODE(2),
JOB(3),
LAUNCHTASK(4),
- TERMINATETASK(5);
+ TERMINATETASK(5),
+ TASKOUTPUT(6);
private final int value;
@@ -67,6 +68,8 @@ import org.apache.thrift.TEnum;
return LAUNCHTASK;
case 5:
return TERMINATETASK;
+ case 6:
+ return TASKOUTPUT;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
new file mode 100644
index 0000000..e1d001a
--- /dev/null
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ProcessSubmitEvent.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.airavata.model.messaging.event;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class ProcessSubmitEvent implements org.apache.thrift.TBase<ProcessSubmitEvent, ProcessSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ProcessSubmitEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ProcessSubmitEvent");
+
+ private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField CREDENTIAL_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("credentialToken", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ProcessSubmitEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ProcessSubmitEventTupleSchemeFactory());
+ }
+
+ private String taskId; // required
+ private String credentialToken; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TASK_ID((short)1, "taskId"),
+ CREDENTIAL_TOKEN((short)2, "credentialToken");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TASK_ID
+ return TASK_ID;
+ case 2: // CREDENTIAL_TOKEN
+ return CREDENTIAL_TOKEN;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.CREDENTIAL_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("credentialToken", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ProcessSubmitEvent.class, metaDataMap);
+ }
+
+ public ProcessSubmitEvent() {
+ }
+
+ public ProcessSubmitEvent(
+ String taskId,
+ String credentialToken)
+ {
+ this();
+ this.taskId = taskId;
+ this.credentialToken = credentialToken;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ProcessSubmitEvent(ProcessSubmitEvent other) {
+ if (other.isSetTaskId()) {
+ this.taskId = other.taskId;
+ }
+ if (other.isSetCredentialToken()) {
+ this.credentialToken = other.credentialToken;
+ }
+ }
+
+ public ProcessSubmitEvent deepCopy() {
+ return new ProcessSubmitEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.taskId = null;
+ this.credentialToken = null;
+ }
+
+ public String getTaskId() {
+ return this.taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public void unsetTaskId() {
+ this.taskId = null;
+ }
+
+ /** Returns true if field taskId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTaskId() {
+ return this.taskId != null;
+ }
+
+ public void setTaskIdIsSet(boolean value) {
+ if (!value) {
+ this.taskId = null;
+ }
+ }
+
+ public String getCredentialToken() {
+ return this.credentialToken;
+ }
+
+ public void setCredentialToken(String credentialToken) {
+ this.credentialToken = credentialToken;
+ }
+
+ public void unsetCredentialToken() {
+ this.credentialToken = null;
+ }
+
+ /** Returns true if field credentialToken is set (has been assigned a value) and false otherwise */
+ public boolean isSetCredentialToken() {
+ return this.credentialToken != null;
+ }
+
+ public void setCredentialTokenIsSet(boolean value) {
+ if (!value) {
+ this.credentialToken = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TASK_ID:
+ if (value == null) {
+ unsetTaskId();
+ } else {
+ setTaskId((String)value);
+ }
+ break;
+
+ case CREDENTIAL_TOKEN:
+ if (value == null) {
+ unsetCredentialToken();
+ } else {
+ setCredentialToken((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TASK_ID:
+ return getTaskId();
+
+ case CREDENTIAL_TOKEN:
+ return getCredentialToken();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TASK_ID:
+ return isSetTaskId();
+ case CREDENTIAL_TOKEN:
+ return isSetCredentialToken();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ProcessSubmitEvent)
+ return this.equals((ProcessSubmitEvent)that);
+ return false;
+ }
+
+ public boolean equals(ProcessSubmitEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_taskId = true && this.isSetTaskId();
+ boolean that_present_taskId = true && that.isSetTaskId();
+ if (this_present_taskId || that_present_taskId) {
+ if (!(this_present_taskId && that_present_taskId))
+ return false;
+ if (!this.taskId.equals(that.taskId))
+ return false;
+ }
+
+ boolean this_present_credentialToken = true && this.isSetCredentialToken();
+ boolean that_present_credentialToken = true && that.isSetCredentialToken();
+ if (this_present_credentialToken || that_present_credentialToken) {
+ if (!(this_present_credentialToken && that_present_credentialToken))
+ return false;
+ if (!this.credentialToken.equals(that.credentialToken))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(ProcessSubmitEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTaskId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetCredentialToken()).compareTo(other.isSetCredentialToken());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCredentialToken()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentialToken, other.credentialToken);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ProcessSubmitEvent(");
+ boolean first = true;
+
+ sb.append("taskId:");
+ if (this.taskId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.taskId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("credentialToken:");
+ if (this.credentialToken == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.credentialToken);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!isSetTaskId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString());
+ }
+
+ if (!isSetCredentialToken()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'credentialToken' is unset! Struct:" + toString());
+ }
+
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ProcessSubmitEventStandardSchemeFactory implements SchemeFactory {
+ public ProcessSubmitEventStandardScheme getScheme() {
+ return new ProcessSubmitEventStandardScheme();
+ }
+ }
+
+ private static class ProcessSubmitEventStandardScheme extends StandardScheme<ProcessSubmitEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TASK_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // CREDENTIAL_TOKEN
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.credentialToken = iprot.readString();
+ struct.setCredentialTokenIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.taskId != null) {
+ oprot.writeFieldBegin(TASK_ID_FIELD_DESC);
+ oprot.writeString(struct.taskId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.credentialToken != null) {
+ oprot.writeFieldBegin(CREDENTIAL_TOKEN_FIELD_DESC);
+ oprot.writeString(struct.credentialToken);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ProcessSubmitEventTupleSchemeFactory implements SchemeFactory {
+ public ProcessSubmitEventTupleScheme getScheme() {
+ return new ProcessSubmitEventTupleScheme();
+ }
+ }
+
+ private static class ProcessSubmitEventTupleScheme extends TupleScheme<ProcessSubmitEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ oprot.writeString(struct.taskId);
+ oprot.writeString(struct.credentialToken);
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ProcessSubmitEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ struct.taskId = iprot.readString();
+ struct.setTaskIdIsSet(true);
+ struct.credentialToken = iprot.readString();
+ struct.setCredentialTokenIsSet(true);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index d9e85d4..b13b5ed 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -40,7 +40,8 @@ enum MessageType {
WORKFLOWNODE,
JOB,
LAUNCHTASK,
- TERMINATETASK
+ TERMINATETASK,
+ TASKOUTPUT
}
struct ExperimentStatusChangeEvent {
@@ -102,6 +103,11 @@ struct JobIdentifier {
// //8:
// }
+struct ProcessSubmitEvent{
+ 1: required string taskId;
+ 2: required string credentialToken;
+}
+
struct TaskSubmitEvent{
1: required string experimentId,
2: required string taskId,
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 837b728..5490d50 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.core.monitor;
import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
@@ -150,4 +151,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
}
}
}
+
+
+ @Subscribe
+ public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent) throws AiravataException {
+ String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+ logger.debug("Task Output changed event received for workflow node : " +
+ taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+ // TODO - do we need to update the output to the registry? , we do it in the workflowInterpreter too.
+ MessageContext messageContext = new MessageContext(taskOutputEvent, MessageType.TASKOUTPUT, taskOutputEvent.getTaskIdentity().getTaskId(), taskOutputEvent.getTaskIdentity().getGatewayId());
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
new file mode 100644
index 0000000..3352893
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RabbitMQProcessConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessConsumer.class);
+
+ private String url;
+ private Connection connection;
+ private Channel channel;
+
+ public RabbitMQProcessConsumer() throws AiravataException {
+ try {
+ url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ createConnection();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ }
+
+ private void createConnection() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(url);
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for default");
+
+ channel = connection.createChannel();
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange default";
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+
+ public String listen(final MessageHandler handler) throws AiravataException {
+ try {
+ Map<String, Object> props = handler.getProperties();
+ final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+ if (routing == null) {
+ throw new IllegalArgumentException("The routing key must be present");
+ }
+ String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+ String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+ if (queueName == null) {
+ if (!channel.isOpen()) {
+ channel = connection.createChannel();
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+ }
+ queueName = channel.queueDeclare().getQueue();
+ } else {
+ channel.queueDeclare(queueName, true, false, false, null);
+ }
+
+ if (consumerTag == null) {
+ consumerTag = "default";
+ }
+ // autoAck=false, we will ack after task is done
+ final String finalQueueName = queueName;
+ channel.basicConsume(queueName, true, new QueueingConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ TBase event = null;
+ String gatewayId = null;
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+ log.debug("Message received with message id : " + message.getMessageId()
+ + " with task id : " + processSubmitEvent.getTaskId());
+ event = processSubmitEvent;
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), null);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ handler.onMessage(messageContext);
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + finalQueueName;
+ log.warn(msg, e);
+ }
+ }
+ });
+ return "";
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange default";
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void stopListen(final String queueName , final String exchangeName) throws AiravataException {
+ try {
+ channel.queueUnbind(queueName, exchangeName, null);
+ } catch (IOException e) {
+ String msg = "could not un-bind queue: " + queueName + " for exchange " + exchangeName;
+ log.debug(msg);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
new file mode 100644
index 0000000..3684198
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RabbitMQProcessPublisher implements Publisher {
+
+ private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessPublisher.class);
+ public static final String PROCESS = "process.queue" ;
+
+ private RabbitMQProducer rabbitMQProducer;
+
+ public RabbitMQProcessPublisher() throws Exception {
+ String brokerUrl;
+ String exchangeName;
+ try {
+ brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, null, null);
+ rabbitMQProducer.open();
+ }
+
+ @Override
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ log.info("Publishing to process queue ...");
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+ String queueName = PROCESS;
+ message.setMessageType(MessageType.TASK);
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.sendToWorkerQueue(messageBody, queueName);
+ } catch (TException e) {
+ String msg = "Error while serializing the thrift object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index d5e8c72..6efa77a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -174,6 +174,12 @@ public class RabbitMQStatusConsumer implements Consumer {
taskStatusChangeEvent.getState());
event = taskStatusChangeEvent;
gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+ }else if (message.getMessageType() == MessageType.TASKOUTPUT) {
+ TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+ event = taskOutputChangeEvent;
+ gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
} else if (message.getMessageType().equals(MessageType.JOB)) {
JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
@@ -182,7 +188,7 @@ public class RabbitMQStatusConsumer implements Consumer {
jobStatusChangeEvent.getState());
event = jobStatusChangeEvent;
gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
- }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+ } else if (message.getMessageType().equals(MessageType.LAUNCHTASK)) {
TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
log.debug(" Message Received with message id '" + message.getMessageId()
@@ -190,7 +196,7 @@ public class RabbitMQStatusConsumer implements Consumer {
taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
event = taskSubmitEvent;
gatewayId = taskSubmitEvent.getGatewayId();
- }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+ } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) {
TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
log.debug(" Message Received with message id '" + message.getMessageId()
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 966d44d..a149037 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -68,21 +68,25 @@ public class RabbitMQStatusPublisher implements Publisher {
message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
String gatewayId = msgCtx.getGatewayId();
String routingKey = null;
- if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+ if (msgCtx.getType() == MessageType.EXPERIMENT) {
ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
routingKey = gatewayId + "." + event.getExperimentId();
- } else if (msgCtx.getType().equals(MessageType.TASK)) {
+ } else if (msgCtx.getType() == MessageType.TASK) {
TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
- routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
- }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+ } else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+ TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+ } else if (msgCtx.getType() == MessageType.WORKFLOWNODE) {
WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity();
- routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
- }else if (msgCtx.getType().equals(MessageType.JOB)){
- JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+ routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+ } else if (msgCtx.getType() == MessageType.JOB) {
+ JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
JobIdentifier identity = event.getJobIdentity();
- routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+ routingKey = gatewayId + "." + identity.getExperimentId() + "." +
identity.getWorkflowNodeId() + "." +
identity.getTaskId() + "." +
identity.getJobId();
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e873e05..a4e105e 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -38,14 +38,19 @@ import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.util.ExecutionType;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
@@ -61,6 +66,7 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetai
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
import org.apache.airavata.orchestrator.util.DataModelUtils;
import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter;
+import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
@@ -86,7 +92,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
private String gatewayName;
private Publisher publisher;
- /**
+ private RabbitMQProcessConsumer rabbitMQProcessConsumer;
+ private RabbitMQProcessPublisher rabbitMQProcessPublisher;
+
+ /**
* Query orchestrator server to fetch the CPI version
*/
public String getOrchestratorCPIVersion() throws TException {
@@ -152,7 +161,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
orchestrator.initialize();
orchestrator.getOrchestratorContext().setZk(this.zk);
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
- } catch (OrchestratorException e) {
+ startProcessConsumer();
+ } catch (OrchestratorException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
} catch (RegistryException e) {
@@ -161,6 +171,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
}
}
+ private void startProcessConsumer() throws OrchestratorException {
+ try {
+ rabbitMQProcessConsumer = new RabbitMQProcessConsumer();
+ ProcessConsumer processConsumer = new ProcessConsumer();
+ Thread thread = new Thread(processConsumer);
+ thread.start();
+
+ } catch (AiravataException e) {
+ throw new OrchestratorException("Error while starting process consumer", e);
+ }
+
+ }
+
private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException {
Stat zkStat = zk.exists(orchServer, false);
if (zkStat == null) {
@@ -627,6 +650,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
}
return true;
}
+
private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException {
// try {
// WorkflowEngine workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
@@ -634,15 +658,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
// } catch (WorkflowEngineException e) {
// log.errorId(experimentId, "Error while launching experiment.", e);
// }
-
try {
- SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(experimentId, airavataCredStoreToken);
+ SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
+ experimentId, airavataCredStoreToken,getGatewayName(), getRabbitMQProcessPublisher());
+
Thread thread = new Thread(simpleWorkflowInterpreter);
thread.start();
// simpleWorkflowInterpreter.run();
} catch (RegistryException e) {
log.error("Error while launching workflow", e);
+ } catch (Exception e) {
+ log.error("Error while initializing rabbit mq process publisher");
+ }
+ }
+
+ public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception {
+ if (rabbitMQProcessPublisher == null) {
+ rabbitMQProcessPublisher = new RabbitMQProcessPublisher();
}
+ return rabbitMQProcessPublisher;
}
@@ -732,4 +766,38 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
}
}
+ private class ProcessConsumer implements Runnable, MessageHandler{
+
+
+ @Override
+ public void run() {
+ try {
+ rabbitMQProcessConsumer.listen(this);
+ } catch (AiravataException e) {
+ log.error("Error while listen to the RabbitMQProcessConsumer");
+ }
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS);
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS);
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext msgCtx) {
+ TBase event = msgCtx.getEvent();
+ if (event instanceof ProcessSubmitEvent) {
+ ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event;
+ try {
+ launchTask(processSubmitEvent.getTaskId(), processSubmitEvent.getCredentialToken());
+ } catch (TException e) {
+ log.error("Error while launching task : " + processSubmitEvent.getTaskId());
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/pom.xml
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/pom.xml b/modules/simple-workflow/pom.xml
index 6b36335..5cb9dfb 100644
--- a/modules/simple-workflow/pom.xml
+++ b/modules/simple-workflow/pom.xml
@@ -48,6 +48,11 @@
</dependency>
<!-- Messaging dependency -->
<dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
new file mode 100644
index 0000000..849af85
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessContext.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessContext {
+ private WorkflowNode workflowNode;
+ private WorkflowNodeDetails wfNodeDetails;
+ private TaskDetails taskDetails;
+
+ public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+ this.workflowNode = workflowNode;
+ this.wfNodeDetails = wfNodeDetails;
+ this.taskDetails = taskDetails;
+ }
+
+ public WorkflowNode getWorkflowNode() {
+ return workflowNode;
+ }
+
+ public void setWorkflowNode(WorkflowNode workflowNode) {
+ this.workflowNode = workflowNode;
+ }
+
+ public WorkflowNodeDetails getWfNodeDetails() {
+ return wfNodeDetails;
+ }
+
+ public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+ this.wfNodeDetails = wfNodeDetails;
+ }
+
+ public TaskDetails getTaskDetails() {
+ return taskDetails;
+ }
+
+ public void setTaskDetails(TaskDetails taskDetails) {
+ this.taskDetails = taskDetails;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
deleted file mode 100644
index b58b947..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine;
-
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-
-public class ProcessPack {
- private WorkflowNode workflowNode;
- private WorkflowNodeDetails wfNodeDetails;
- private TaskDetails taskDetails;
-
- public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
- this.workflowNode = workflowNode;
- this.wfNodeDetails = wfNodeDetails;
- this.taskDetails = taskDetails;
- }
-
- public WorkflowNode getWorkflowNode() {
- return workflowNode;
- }
-
- public void setWorkflowNode(WorkflowNode workflowNode) {
- this.workflowNode = workflowNode;
- }
-
- public WorkflowNodeDetails getWfNodeDetails() {
- return wfNodeDetails;
- }
-
- public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
- this.wfNodeDetails = wfNodeDetails;
- }
-
- public TaskDetails getTaskDetails() {
- return taskDetails;
- }
-
- public void setTaskDetails(TaskDetails taskDetails) {
- this.taskDetails = taskDetails;
- }
-}