You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/10 19:19:40 UTC
[1/2] airavata git commit: Refactored messaging module to remove
duplicate code and support multiple publishers
Repository: airavata
Updated Branches:
refs/heads/develop 3a70c5905 -> 12d9efb90
Refactored messaging module to remove duplicate code and support multiple publishers
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e4cc54d9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e4cc54d9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e4cc54d9
Branch: refs/heads/develop
Commit: e4cc54d923d406542ed5d007aa1fb48a8094d29f
Parents: 2df4bb2
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 15:18:53 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 15:18:53 2016 -0400
----------------------------------------------------------------------
.../org/apache/airavata/gfac/impl/Factory.java | 9 +-
.../airavata/gfac/server/GfacServerHandler.java | 6 +-
.../messaging/client/RabbitMQListener.java | 3 +-
.../messaging/core/MessagingFactory.java | 86 +++-
.../airavata/messaging/core/Publisher.java | 4 +-
.../messaging/core/RabbitMQProperties.java | 125 ++++++
.../airavata/messaging/core/Subscriber.java | 5 -
.../messaging/core/SubscriberProperties.java | 125 ------
.../airavata/messaging/core/TestClient.java | 2 +-
.../apache/airavata/messaging/core/Type.java | 27 ++
.../impl/RabbitMQProcessLaunchPublisher.java | 156 +++----
.../messaging/core/impl/RabbitMQProducer.java | 440 +++++++++----------
.../messaging/core/impl/RabbitMQPublisher.java | 118 +++++
.../core/impl/RabbitMQStatusPublisher.java | 212 ++++-----
.../messaging/core/impl/RabbitMQSubscriber.java | 7 +-
.../server/OrchestratorServerHandler.java | 3 +-
.../ExperimentExecution.java | 3 +-
.../workflow/core/WorkflowEnactmentService.java | 7 +-
.../workflow/core/WorkflowInterpreter.java | 8 +-
19 files changed, 777 insertions(+), 569 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index d105c18..673f37b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -66,7 +66,8 @@ import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
@@ -145,9 +146,9 @@ public abstract class Factory {
public static Publisher getStatusPublisher() throws AiravataException {
if (statusPublisher == null) {
- synchronized (RabbitMQStatusPublisher.class) {
+ synchronized (RabbitMQPublisher.class) {
if (statusPublisher == null) {
- statusPublisher = new RabbitMQStatusPublisher();
+ statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
}
}
}
@@ -169,7 +170,7 @@ public abstract class Factory {
public static synchronized Subscriber getProcessLaunchSubscriber() throws AiravataException {
if (processLaunchSubscriber == null) {
- processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH);
+ processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Type.PROCESS_LAUNCH);
}
return processLaunchSubscriber;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index a490d91..44073dc 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -39,7 +39,7 @@ import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
+import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -98,9 +98,9 @@ public class GfacServerHandler implements GfacService.Iface {
// init process consumer
List<String> routingKeys = new ArrayList<>();
routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
- processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH);
+ processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH);
// init status publisher
- statusPublisher = new RabbitMQStatusPublisher();
+ statusPublisher = Factory.getStatusPublisher();
}
private void startCuratorClient() throws ApplicationSettingsException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
index 984aa59..9003897 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -27,6 +27,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
@@ -69,7 +70,7 @@ public class RabbitMQListener {
System.out.println("broker url " + brokerUrl);
final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
List<String> routingKeys = getRoutingKeys(level);
- Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS);
+ Subscriber subscriber = MessagingFactory.getSubscriber(message -> {}, routingKeys, Type.STATUS);
} catch (ApplicationSettingsException e) {
logger.error("Error reading airavata server properties", e);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index ee68d0c..99c11b8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -23,29 +23,37 @@ package org.apache.airavata.messaging.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.messaging.core.impl.ProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber;
import org.apache.airavata.messaging.core.impl.StatusConsumer;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
-import java.util.ArrayList;
import java.util.List;
public class MessagingFactory {
- public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Subscriber.Type type) throws AiravataException {
+ public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Type type) throws AiravataException {
Subscriber subscriber = null;
- SubscriberProperties sp = getSubscriberProperties();
+ RabbitMQProperties rProperties = getProperties();
switch (type) {
case EXPERIMENT_LAUNCH:
break;
case PROCESS_LAUNCH:
- subscriber = getProcessSubscriber(sp);
+ subscriber = getProcessSubscriber(rProperties);
subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel),
null,
routingKeys);
break;
case STATUS:
- subscriber = getStatusSubscriber(sp);
+ subscriber = getStatusSubscriber(rProperties);
subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel),
null,
routingKeys);
@@ -57,24 +65,53 @@ public class MessagingFactory {
return subscriber;
}
- private static SubscriberProperties getSubscriberProperties() {
- return new SubscriberProperties()
+ public static Publisher getPublisher(Type type) throws AiravataException {
+ RabbitMQProperties rProperties = getProperties();
+ Publisher publiser = null;
+ switch (type) {
+ case EXPERIMENT_LAUNCH:
+ break;
+ case PROCESS_LAUNCH:
+ publiser = gerProcessPublisher(rProperties);
+ break;
+ case STATUS:
+ publiser = getStatusPublisher(rProperties);
+ break;
+ default:
+ throw new IllegalArgumentException("Publisher " + type + " is not handled");
+ }
+
+ return publiser;
+ }
+
+ private static Publisher getStatusPublisher(RabbitMQProperties rProperties) throws AiravataException {
+ rProperties.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName());
+ return new RabbitMQPublisher(rProperties, MessagingFactory::statusRoutingkey);
+ }
+
+ private static Publisher gerProcessPublisher(RabbitMQProperties rProperties) throws AiravataException {
+ rProperties.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName());
+ return new RabbitMQPublisher(rProperties, messageContext -> rProperties.getExchangeName());
+ }
+
+ private static RabbitMQProperties getProperties() {
+ return new RabbitMQProperties()
.setBrokerUrl(ServerSettings.RABBITMQ_BROKER_URL)
.setDurable(ServerSettings.getRabbitmqDurableQueue())
.setPrefetchCount(ServerSettings.getRabbitmqPrefetchCount())
.setAutoRecoveryEnable(true)
.setConsumerTag("default")
- .setExchangeType(SubscriberProperties.EXCHANGE_TYPE.TOPIC);
+ .setExchangeType(RabbitMQProperties.EXCHANGE_TYPE.TOPIC);
}
- private static RabbitMQSubscriber getStatusSubscriber(SubscriberProperties sp) throws AiravataException {
+ private static RabbitMQSubscriber getStatusSubscriber(RabbitMQProperties sp) throws AiravataException {
sp.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName())
.setAutoAck(true);
return new RabbitMQSubscriber(sp);
}
- private static RabbitMQSubscriber getProcessSubscriber(SubscriberProperties sp) throws AiravataException {
+ private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException {
sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
.setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
.setAutoAck(false);
@@ -82,7 +119,34 @@ public class MessagingFactory {
}
-
+ private static String statusRoutingkey(MessageContext msgCtx) {
+ String gatewayId = msgCtx.getGatewayId();
+ String routingKey = null;
+ if (msgCtx.getType() == MessageType.EXPERIMENT) {
+ ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+ routingKey = gatewayId + "." + event.getExperimentId();
+ } else if (msgCtx.getType() == MessageType.TASK) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+ } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
+ TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+ } else if (msgCtx.getType() == MessageType.PROCESS) {
+ ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
+ ProcessIdentifier processIdentifier = event.getProcessIdentity();
+ routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
+ } else if (msgCtx.getType() == MessageType.JOB) {
+ JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
+ JobIdentifier identity = event.getJobIdentity();
+ routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+ identity.getProcessId() + "." +
+ identity.getTaskId() + "." +
+ identity.getJobId();
+ }
+ return routingKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index b8b586c..28a9a06 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -31,8 +31,8 @@ public interface Publisher {
/**
*
- * @param message object of message context which will include actual event and other information
+ * @param messageContext object of message context which will include actual event and other information
* @throws AiravataException
*/
- public void publish(MessageContext message) throws AiravataException;
+ public void publish(MessageContext messageContext) throws AiravataException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
new file mode 100644
index 0000000..5bb1929
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public class RabbitMQProperties {
+ private String brokerUrl;
+ private EXCHANGE_TYPE exchangeType;
+ private String exchangeName;
+ private int prefetchCount;
+ private boolean durable;
+ private String queueName;
+ private String consumerTag = "default";
+ private boolean autoRecoveryEnable;
+ private boolean autoAck;
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ public RabbitMQProperties setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ return this;
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public RabbitMQProperties setDurable(boolean durable) {
+ this.durable = durable;
+ return this;
+ }
+
+ public String getExchangeName() {
+ return exchangeName;
+ }
+
+ public RabbitMQProperties setExchangeName(String exchangeName) {
+ this.exchangeName = exchangeName;
+ return this;
+ }
+
+ public int getPrefetchCount() {
+ return prefetchCount;
+ }
+
+ public RabbitMQProperties setPrefetchCount(int prefetchCount) {
+ this.prefetchCount = prefetchCount;
+ return this;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public RabbitMQProperties setQueueName(String queueName) {
+ this.queueName = queueName;
+ return this;
+ }
+
+ public String getConsumerTag() {
+ return consumerTag;
+ }
+
+ public RabbitMQProperties setConsumerTag(String consumerTag) {
+ this.consumerTag = consumerTag;
+ return this;
+ }
+
+ public boolean isAutoRecoveryEnable() {
+ return autoRecoveryEnable;
+ }
+
+ public RabbitMQProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
+ this.autoRecoveryEnable = autoRecoveryEnable;
+ return this;
+ }
+
+ public String getExchangeType() {
+ return exchangeType.type;
+ }
+
+ public RabbitMQProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
+ this.exchangeType = exchangeType;
+ return this;
+ }
+
+ public boolean isAutoAck() {
+ return autoAck;
+ }
+
+ public RabbitMQProperties setAutoAck(boolean autoAck) {
+ this.autoAck = autoAck;
+ return this;
+ }
+
+ public enum EXCHANGE_TYPE{
+ TOPIC("topic"),
+ FANOUT("fanout");
+
+ private String type;
+
+ EXCHANGE_TYPE(String type) {
+ this.type = type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
index 7952cb3..cc357a0 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -50,9 +50,4 @@ public interface Subscriber {
void sendAck(long deliveryTag);
- enum Type {
- EXPERIMENT_LAUNCH,
- PROCESS_LAUNCH,
- STATUS
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
deleted file mode 100644
index 025e93b..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.messaging.core;
-
-public class SubscriberProperties {
- private String brokerUrl;
- private EXCHANGE_TYPE exchangeType;
- private String exchangeName;
- private int prefetchCount;
- private boolean durable;
- private String queueName;
- private String consumerTag = "default";
- private boolean autoRecoveryEnable;
- private boolean autoAck;
-
- public String getBrokerUrl() {
- return brokerUrl;
- }
-
- public SubscriberProperties setBrokerUrl(String brokerUrl) {
- this.brokerUrl = brokerUrl;
- return this;
- }
-
- public boolean isDurable() {
- return durable;
- }
-
- public SubscriberProperties setDurable(boolean durable) {
- this.durable = durable;
- return this;
- }
-
- public String getExchangeName() {
- return exchangeName;
- }
-
- public SubscriberProperties setExchangeName(String exchangeName) {
- this.exchangeName = exchangeName;
- return this;
- }
-
- public int getPrefetchCount() {
- return prefetchCount;
- }
-
- public SubscriberProperties setPrefetchCount(int prefetchCount) {
- this.prefetchCount = prefetchCount;
- return this;
- }
-
- public String getQueueName() {
- return queueName;
- }
-
- public SubscriberProperties setQueueName(String queueName) {
- this.queueName = queueName;
- return this;
- }
-
- public String getConsumerTag() {
- return consumerTag;
- }
-
- public SubscriberProperties setConsumerTag(String consumerTag) {
- this.consumerTag = consumerTag;
- return this;
- }
-
- public boolean isAutoRecoveryEnable() {
- return autoRecoveryEnable;
- }
-
- public SubscriberProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
- this.autoRecoveryEnable = autoRecoveryEnable;
- return this;
- }
-
- public String getExchangeType() {
- return exchangeType.type;
- }
-
- public SubscriberProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
- this.exchangeType = exchangeType;
- return this;
- }
-
- public boolean isAutoAck() {
- return autoAck;
- }
-
- public SubscriberProperties setAutoAck(boolean autoAck) {
- this.autoAck = autoAck;
- return this;
- }
-
- public enum EXCHANGE_TYPE{
- TOPIC("topic"),
- FANOUT("fanout");
-
- private String type;
-
- EXCHANGE_TYPE(String type) {
- this.type = type;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index daa9886..65781fe 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -46,7 +46,7 @@ public class TestClient {
List<String> routingKeys = new ArrayList<>();
routingKeys.add(experimentId);
routingKeys.add(experimentId + ".*");
- MessagingFactory.getSubscriber(getMessageHandler(),routingKeys, Subscriber.Type.STATUS);
+ MessagingFactory.getSubscriber(getMessageHandler(),routingKeys, Type.STATUS);
} catch (ApplicationSettingsException e) {
logger.error("Error reading airavata server properties", e);
}catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
new file mode 100644
index 0000000..6980e1c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public enum Type {
+ EXPERIMENT_LAUNCH,
+ PROCESS_LAUNCH,
+ STATUS
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
index 5cf960e..ba4b6f2 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -1,78 +1,78 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQProcessLaunchPublisher implements Publisher{
- private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
- private String launchTask;
-
- private RabbitMQProducer rabbitMQProducer;
-
- public RabbitMQProcessLaunchPublisher() throws Exception {
- String brokerUrl;
- try {
- brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
- rabbitMQProducer.open();
- }
-
- public void publish(MessageContext msgCtx) throws AiravataException {
- try {
- log.info("Publishing to launch queue ...");
- byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
- Message message = new Message();
- message.setEvent(body);
- message.setMessageId(msgCtx.getMessageId());
- message.setMessageType(msgCtx.getType());
- message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
- String routingKey = launchTask;
- byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
- log.info("Successfully published to launch queue ...");
- } catch (TException e) {
- String msg = "Error while deserializing the object";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- } catch (Exception e) {
- String msg = "Error while sending to rabbitmq";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- }
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.messaging.core.impl;
+//
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.messaging.core.Publisher;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class RabbitMQProcessLaunchPublisher implements Publisher{
+// private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
+// private String launchTask;
+//
+// private RabbitMQProducer rabbitMQProducer;
+//
+// public RabbitMQProcessLaunchPublisher() throws Exception {
+// String brokerUrl;
+// try {
+// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+// launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
+// } catch (ApplicationSettingsException e) {
+// String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+// log.error(message, e);
+// throw new AiravataException(message, e);
+// }
+// rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
+// rabbitMQProducer.open();
+// }
+//
+// public void publish(MessageContext msgCtx) throws AiravataException {
+// try {
+// log.info("Publishing to launch queue ...");
+// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+// Message message = new Message();
+// message.setEvent(body);
+// message.setMessageId(msgCtx.getMessageId());
+// message.setMessageType(msgCtx.getType());
+// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+// String routingKey = launchTask;
+// byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+// rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
+// log.info("Successfully published to launch queue ...");
+// } catch (TException e) {
+// String msg = "Error while deserializing the object";
+// log.error(msg, e);
+// throw new AiravataException(msg, e);
+// } catch (Exception e) {
+// String msg = "Error while sending to rabbitmq";
+// log.error(msg, e);
+// throw new AiravataException(msg, e);
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index fc494d4..73138c7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -1,220 +1,220 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class RabbitMQProducer {
- public static final int DEFAULT_PRE_FETCH = 64;
-
- private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
-
- private Connection connection;
-
- private Channel channel;
-
- private QueueingConsumer consumer;
-
- private String consumerTag;
-
- private String exchangeName;
-
- private int prefetchCount = DEFAULT_PRE_FETCH;
-
- private boolean isReQueueOnFail = false;
-
- private String url;
-
- private String getExchangeType = "topic";
-
-
- public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
- this.exchangeName = exchangeName;
- this.url = url;
- this.getExchangeType = getExchangeType;
- }
-
- public RabbitMQProducer(String url, String exchangeName) {
- this.exchangeName = exchangeName;
- this.url = url;
- }
-
- public void setPrefetchCount(int prefetchCount) {
- this.prefetchCount = prefetchCount;
- }
-
- public void setReQueueOnFail(boolean isReQueueOnFail) {
- this.isReQueueOnFail = isReQueueOnFail;
- }
-
- private void reset() {
- consumerTag = null;
- }
-
- private void reInitIfNecessary() throws Exception {
- if (consumerTag == null || consumer == null) {
- close();
- open();
- }
- }
-
- public void close() {
- log.info("Closing channel to exchange {}", exchangeName);
- try {
- if (channel != null && channel.isOpen()) {
- if (consumerTag != null) {
- channel.basicCancel(consumerTag);
- }
- channel.close();
- }
- } catch (Exception e) {
- log.debug("error closing channel and/or cancelling consumer", e);
- }
- try {
- log.info("closing connection to rabbitmq: " + connection);
- connection.close();
- } catch (Exception e) {
- log.debug("error closing connection", e);
- }
- consumer = null;
- consumerTag = null;
- channel = null;
- connection = null;
- }
-
- public void open() throws AiravataException {
- try {
- connection = createConnection();
- channel = connection.createChannel();
- if (prefetchCount > 0) {
- log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
- channel.basicQos(prefetchCount);
- }
- if(exchangeName!=null) {
- channel.exchangeDeclare(exchangeName, getExchangeType, false);
- }
- } catch (Exception e) {
- reset();
- String msg = "could not open channel for exchange " + exchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void send(byte []message, String routingKey) throws Exception {
- try {
- channel.basicPublish(exchangeName, routingKey, null, message);
- } catch (IOException e) {
- String msg = "Failed to publish message to exchange: " + exchangeName;
- log.error(msg, e);
- throw new Exception(msg, e);
- }
- }
-
- public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
- try {
- channel.basicPublish( "", routingKey,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message);
- } catch (IOException e) {
- String msg = "Failed to publish message to exchange: " + exchangeName;
- log.error(msg, e);
- throw new Exception(msg, e);
- }
- }
-
- private Connection createConnection() throws IOException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- Connection connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
- return connection;
- } catch (Exception e) {
- log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
- return null;
- }
- }
-
- public void ackMessage(Long msgId) throws Exception {
- try {
- channel.basicAck(msgId, false);
- } catch (ShutdownSignalException sse) {
- reset();
- String msg = "shutdown signal received while attempting to ack message";
- log.error(msg, sse);
- throw new Exception(msg, sse);
- } catch (Exception e) {
- String s = "could not ack for msgId: " + msgId;
- log.error(s, e);
- throw new Exception(s, e);
- }
- }
-
- public void failMessage(Long msgId) throws Exception {
- if (isReQueueOnFail) {
- failWithRedelivery(msgId);
- } else {
- deadLetter(msgId);
- }
- }
-
- public void failWithRedelivery(Long msgId) throws Exception {
- try {
- channel.basicReject(msgId, true);
- } catch (ShutdownSignalException sse) {
- reset();
- String msg = "shutdown signal received while attempting to fail with redelivery";
- log.error(msg, sse);
- throw new Exception(msg, sse);
- } catch (Exception e) {
- String msg = "could not fail with redelivery for msgId: " + msgId;
- log.error(msg, e);
- throw new Exception(msg, e);
- }
- }
-
- public void deadLetter(Long msgId) throws Exception {
- try {
- channel.basicReject(msgId, false);
- } catch (ShutdownSignalException sse) {
- reset();
- String msg = "shutdown signal received while attempting to fail with no redelivery";
- log.error(msg, sse);
- throw new Exception(msg, sse);
- } catch (Exception e) {
- String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId;
- log.error(msg, e);
- throw new Exception(msg, e);
- }
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//
+//public class RabbitMQProducer {
+// public static final int DEFAULT_PRE_FETCH = 64;
+//
+// private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
+//
+// private Connection connection;
+//
+// private Channel channel;
+//
+// private QueueingConsumer consumer;
+//
+// private String consumerTag;
+//
+// private String exchangeName;
+//
+// private int prefetchCount = DEFAULT_PRE_FETCH;
+//
+// private boolean isReQueueOnFail = false;
+//
+// private String url;
+//
+// private String getExchangeType = "topic";
+//
+//
+// public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
+// this.exchangeName = exchangeName;
+// this.url = url;
+// this.getExchangeType = getExchangeType;
+// }
+//
+// public RabbitMQProducer(String url, String exchangeName) {
+// this.exchangeName = exchangeName;
+// this.url = url;
+// }
+//
+// public void setPrefetchCount(int prefetchCount) {
+// this.prefetchCount = prefetchCount;
+// }
+//
+// public void setReQueueOnFail(boolean isReQueueOnFail) {
+// this.isReQueueOnFail = isReQueueOnFail;
+// }
+//
+// private void reset() {
+// consumerTag = null;
+// }
+//
+// private void reInitIfNecessary() throws Exception {
+// if (consumerTag == null || consumer == null) {
+// close();
+// open();
+// }
+// }
+//
+// public void close() {
+// log.info("Closing channel to exchange {}", exchangeName);
+// try {
+// if (channel != null && channel.isOpen()) {
+// if (consumerTag != null) {
+// channel.basicCancel(consumerTag);
+// }
+// channel.close();
+// }
+// } catch (Exception e) {
+// log.debug("error closing channel and/or cancelling consumer", e);
+// }
+// try {
+// log.info("closing connection to rabbitmq: " + connection);
+// connection.close();
+// } catch (Exception e) {
+// log.debug("error closing connection", e);
+// }
+// consumer = null;
+// consumerTag = null;
+// channel = null;
+// connection = null;
+// }
+//
+// public void open() throws AiravataException {
+// try {
+// connection = createConnection();
+// channel = connection.createChannel();
+// if (prefetchCount > 0) {
+// log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
+// channel.basicQos(prefetchCount);
+// }
+// if(exchangeName!=null) {
+// channel.exchangeDeclare(exchangeName, getExchangeType, false);
+// }
+// } catch (Exception e) {
+// reset();
+// String msg = "could not open channel for exchange " + exchangeName;
+// log.error(msg);
+// throw new AiravataException(msg, e);
+// }
+// }
+//
+// public void send(byte []message, String routingKey) throws Exception {
+// try {
+// channel.basicPublish(exchangeName, routingKey, null, message);
+// } catch (IOException e) {
+// String msg = "Failed to publish message to exchange: " + exchangeName;
+// log.error(msg, e);
+// throw new Exception(msg, e);
+// }
+// }
+//
+// public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
+// try {
+// channel.basicPublish( "", routingKey,
+// MessageProperties.PERSISTENT_TEXT_PLAIN,
+// message);
+// } catch (IOException e) {
+// String msg = "Failed to publish message to exchange: " + exchangeName;
+// log.error(msg, e);
+// throw new Exception(msg, e);
+// }
+// }
+//
+// private Connection createConnection() throws IOException {
+// try {
+// ConnectionFactory connectionFactory = new ConnectionFactory();
+// connectionFactory.setUri(url);
+// connectionFactory.setAutomaticRecoveryEnabled(true);
+// Connection connection = connectionFactory.newConnection();
+// connection.addShutdownListener(new ShutdownListener() {
+// public void shutdownCompleted(ShutdownSignalException cause) {
+// }
+// });
+// log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+// return connection;
+// } catch (Exception e) {
+// log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
+// return null;
+// }
+// }
+//
+// public void ackMessage(Long msgId) throws Exception {
+// try {
+// channel.basicAck(msgId, false);
+// } catch (ShutdownSignalException sse) {
+// reset();
+// String msg = "shutdown signal received while attempting to ack message";
+// log.error(msg, sse);
+// throw new Exception(msg, sse);
+// } catch (Exception e) {
+// String s = "could not ack for msgId: " + msgId;
+// log.error(s, e);
+// throw new Exception(s, e);
+// }
+// }
+//
+// public void failMessage(Long msgId) throws Exception {
+// if (isReQueueOnFail) {
+// failWithRedelivery(msgId);
+// } else {
+// deadLetter(msgId);
+// }
+// }
+//
+// public void failWithRedelivery(Long msgId) throws Exception {
+// try {
+// channel.basicReject(msgId, true);
+// } catch (ShutdownSignalException sse) {
+// reset();
+// String msg = "shutdown signal received while attempting to fail with redelivery";
+// log.error(msg, sse);
+// throw new Exception(msg, sse);
+// } catch (Exception e) {
+// String msg = "could not fail with redelivery for msgId: " + msgId;
+// log.error(msg, e);
+// throw new Exception(msg, e);
+// }
+// }
+//
+// public void deadLetter(Long msgId) throws Exception {
+// try {
+// channel.basicReject(msgId, false);
+// } catch (ShutdownSignalException sse) {
+// reset();
+// String msg = "shutdown signal received while attempting to fail with no redelivery";
+// log.error(msg, sse);
+// throw new Exception(msg, sse);
+// } catch (Exception e) {
+// String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId;
+// log.error(msg, e);
+// throw new Exception(msg, e);
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
new file mode 100644
index 0000000..3fdb3a1
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.RabbitMQProperties;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public class RabbitMQPublisher implements Publisher {
+ private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
+ private final RabbitMQProperties properties;
+ private final Function<MessageContext, String> routingKeySupplier;
+ private Connection connection;
+ private Channel channel;
+
+ public RabbitMQPublisher(RabbitMQProperties properties, Function<MessageContext, String> routingKeySupplier) throws AiravataException {
+ this.properties = properties;
+ this.routingKeySupplier = routingKeySupplier;
+ connect();
+ }
+
+ private void connect() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(properties.getBrokerUrl());
+ connectionFactory.setAutomaticRecoveryEnabled(properties.isAutoRecoveryEnable());
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for " + properties.getExchangeName());
+ channel = connection.createChannel();
+ if (properties.getPrefetchCount() > 0) {
+ channel.basicQos(properties.getPrefetchCount());
+ }
+
+ if (properties.getExchangeName() != null) {
+ channel.exchangeDeclare(properties.getExchangeName(),
+ properties.getExchangeType(),
+ false);
+ }
+ } catch (Exception e) {
+ String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName();
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+
+
+ }
+
+
+ @Override
+ public void publish(MessageContext messageContext) throws AiravataException {
+ try {
+ byte[] body = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(messageContext.getMessageId());
+ message.setMessageType(messageContext.getType());
+ message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
+ String routingKey = routingKeySupplier.apply(messageContext);
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ send(messageBody, routingKey);
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void send(byte []message, String routingKey) throws Exception {
+ try {
+ channel.basicPublish(properties.getExchangeName(), routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message);
+ } catch (IOException e) {
+ String msg = "Failed to publish message to exchange: " + properties.getExchangeName();
+ log.error(msg, e);
+ throw new Exception(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 75077e9..6f728fe 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -1,106 +1,106 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQStatusPublisher implements Publisher {
-
- private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
-
- private RabbitMQProducer rabbitMQProducer;
-
-// StatCounter statCounter = StatCounter.getInstance();
-
- public RabbitMQStatusPublisher() throws AiravataException {
- String brokerUrl;
- String exchangeName;
- try {
- brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
- rabbitMQProducer.open();
- }
-
- public void publish(MessageContext msgCtx) throws AiravataException {
- try {
- log.info("Publishing status to rabbitmq...");
- byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
- Message message = new Message();
- message.setEvent(body);
- message.setMessageId(msgCtx.getMessageId());
- message.setMessageType(msgCtx.getType());
- message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
- String gatewayId = msgCtx.getGatewayId();
- String routingKey = null;
- if (msgCtx.getType() == MessageType.EXPERIMENT) {
- ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
- routingKey = gatewayId + "." + event.getExperimentId();
- } else if (msgCtx.getType() == MessageType.TASK) {
- TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
- routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
- event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
- } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
- TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
- routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
- event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
- } else if (msgCtx.getType() == MessageType.PROCESS) {
- ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
- ProcessIdentifier processIdentifier = event.getProcessIdentity();
- routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
- } else if (msgCtx.getType() == MessageType.JOB) {
- JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
- JobIdentifier identity = event.getJobIdentity();
- routingKey = gatewayId + "." + identity.getExperimentId() + "." +
- identity.getProcessId() + "." +
- identity.getTaskId() + "." +
- identity.getJobId();
- }
- byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.send(messageBody, routingKey);
-// statCounter.add(message);
- } catch (TException e) {
- String msg = "Error while deserializing the object";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- } catch (Exception e) {
- String msg = "Error while sending to rabbitmq";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- }
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.messaging.core.Publisher;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class RabbitMQStatusPublisher implements Publisher {
+//
+// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
+//
+// private RabbitMQProducer rabbitMQProducer;
+//
+//// StatCounter statCounter = StatCounter.getInstance();
+//
+// public RabbitMQStatusPublisher() throws AiravataException {
+// String brokerUrl;
+// String exchangeName;
+// try {
+// brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+// } catch (ApplicationSettingsException e) {
+// String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+// log.error(message, e);
+// throw new AiravataException(message, e);
+// }
+// rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+// rabbitMQProducer.open();
+// }
+//
+// public void publish(MessageContext msgCtx) throws AiravataException {
+// try {
+// log.info("Publishing status to rabbitmq...");
+// byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+// Message message = new Message();
+// message.setEvent(body);
+// message.setMessageId(msgCtx.getMessageId());
+// message.setMessageType(msgCtx.getType());
+// message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+// String gatewayId = msgCtx.getGatewayId();
+// String routingKey = null;
+// if (msgCtx.getType() == MessageType.EXPERIMENT) {
+// ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+// routingKey = gatewayId + "." + event.getExperimentId();
+// } else if (msgCtx.getType() == MessageType.TASK) {
+// TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+// } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
+// TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+// routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+// event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+// } else if (msgCtx.getType() == MessageType.PROCESS) {
+// ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
+// ProcessIdentifier processIdentifier = event.getProcessIdentity();
+// routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
+// } else if (msgCtx.getType() == MessageType.JOB) {
+// JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
+// JobIdentifier identity = event.getJobIdentity();
+// routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+// identity.getProcessId() + "." +
+// identity.getTaskId() + "." +
+// identity.getJobId();
+// }
+// byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+// rabbitMQProducer.send(messageBody, routingKey);
+//// statCounter.add(message);
+// } catch (TException e) {
+// String msg = "Error while deserializing the object";
+// log.error(msg, e);
+// throw new AiravataException(msg, e);
+// } catch (Exception e) {
+// String msg = "Error while sending to rabbitmq";
+// log.error(msg, e);
+// throw new AiravataException(msg, e);
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
index 188847f..441281d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -27,9 +27,8 @@ import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.SubscriberProperties;
+import org.apache.airavata.messaging.core.RabbitMQProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +45,9 @@ public class RabbitMQSubscriber implements Subscriber {
private Connection connection;
private Channel channel;
private Map<String, QueueDetail> queueDetailMap = new HashMap<>();
- private SubscriberProperties properties;
+ private RabbitMQProperties properties;
- public RabbitMQSubscriber(SubscriberProperties properties) throws AiravataException {
+ public RabbitMQSubscriber(RabbitMQProperties properties) throws AiravataException {
this.properties = properties;
createConnection();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index f5c4d2a..5d02100 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -35,6 +35,7 @@ import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -130,7 +131,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// routingKeys.add("*"); // listen for gateway level messages
// routingKeys.add("*.*"); // listen for gateway/experiment level messages
routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages
- statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS);
+ statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Type.STATUS);
startCurator();
} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
log.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
index fa4c3de..1e6edd8 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
@@ -187,7 +188,7 @@ public class ExperimentExecution {
String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT);
System.out.println("broker url " + brokerUrl);
final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT);
- Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS);
+ Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Type.STATUS);
}
private List<String> getRoutingKeys() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index a492ef2..33f11c8 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -25,8 +25,9 @@ import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
+import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -55,7 +56,7 @@ public class WorkflowEnactmentService {
workflowMap = new ConcurrentHashMap<>();
statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))),
getRoutingKeys(),
- Subscriber.Type.STATUS);
+ Type.STATUS);
// register the shutdown hook to un-bind status consumer.
Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
}
@@ -74,7 +75,7 @@ public class WorkflowEnactmentService {
public void submitWorkflow(String experimentId,
String credentialToken,
String gatewayName,
- RabbitMQProcessLaunchPublisher publisher) throws Exception {
+ Publisher publisher) throws Exception {
WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter(
experimentId, credentialToken,gatewayName, publisher);
http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
index ecfdeea..e637c29 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -22,7 +22,7 @@
package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.ComponentState;
import org.apache.airavata.model.ComponentStatus;
import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -75,18 +75,18 @@ class WorkflowInterpreter {
private Map<String, WorkflowNode> completeList = new HashMap<>();
private Registry registry;
private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
- private RabbitMQProcessLaunchPublisher publisher;
+ private Publisher publisher;
private String consumerId;
private boolean continueWorkflow = true;
- public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException {
+ public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, Publisher publisher) throws RegistryException {
this.gatewayName = gatewayName;
setExperiment(experimentId);
this.credentialToken = credentialToken;
this.publisher = publisher;
}
- public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) {
+ public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, Publisher publisher) {
this.gatewayName = gatewayName;
this.experiment = experiment;
this.credentialToken = credentialStoreToken;
[2/2] airavata git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/airavata into develop
Posted by sh...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/airavata into develop
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/12d9efb9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/12d9efb9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/12d9efb9
Branch: refs/heads/develop
Commit: 12d9efb90763b7b3cb20c6733804ea0a712186dd
Parents: e4cc54d 3a70c59
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 15:19:30 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 15:19:30 2016 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 8 +-
.../lib/airavata/workspace_model_types.cpp | 22 ++++
.../lib/airavata/workspace_model_types.h | 12 +-
.../lib/Airavata/Model/Workspace/Types.php | 23 ++++
.../apache/airavata/model/workspace/ttypes.py | 15 ++-
.../airavata/model/workspace/Gateway.java | 111 ++++++++++++++++++-
.../catalog/impl/GatewayRegistry.java | 3 +
.../core/experiment/catalog/model/Gateway.java | 11 ++
.../catalog/resources/GatewayResource.java | 12 ++
.../experiment/catalog/resources/Utils.java | 3 +
.../utils/ThriftDataModelConversion.java | 1 +
.../service/handler/RegistryServerHandler.java | 12 ++
.../workspace_model.thrift | 3 +-
13 files changed, 227 insertions(+), 9 deletions(-)
----------------------------------------------------------------------