You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/15 19:45:33 UTC
[1/2] airavata git commit: Fixed messaging publishing issues
Repository: airavata
Updated Branches:
refs/heads/develop 99d3f4bcd -> cfe62c3e8
Fixed messaging publishing issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f979b50
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f979b50
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f979b50
Branch: refs/heads/develop
Commit: 9f979b50b972db48b0d80bcdbbe3dd932c9a0bc4
Parents: 3fcde52
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Aug 15 15:45:12 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Aug 15 15:45:12 2016 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/server/GfacServerHandler.java | 2 +-
.../apache/airavata/messaging/core/MessagingFactory.java | 3 ++-
.../airavata/messaging/core/impl/ExperimentConsumer.java | 2 +-
.../airavata/messaging/core/impl/ProcessConsumer.java | 10 ++++------
.../airavata/messaging/core/impl/RabbitMQPublisher.java | 3 ++-
.../airavata/messaging/core/impl/RabbitMQSubscriber.java | 7 ++++++-
6 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 44073dc..a7b0714 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -97,7 +97,7 @@ public class GfacServerHandler implements GfacService.Iface {
private void initAMQPClient() throws AiravataException {
// init process consumer
List<String> routingKeys = new ArrayList<>();
- routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
+ routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH);
// init status publisher
statusPublisher = Factory.getStatusPublisher();
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index b3e6d35..802ea5a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -123,7 +123,7 @@ public class MessagingFactory {
private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException {
sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
- .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
+ .setQueueName("process_launch")
.setAutoAck(false);
return new RabbitMQSubscriber(sp);
}
@@ -131,6 +131,7 @@ public class MessagingFactory {
private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException {
sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName())
+ .setQueueName("experiment_launch")
.setAutoAck(false);
return new RabbitMQSubscriber(sp);
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index 6e4c46a..5010358 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -72,7 +72,7 @@ public class ExperimentConsumer extends QueueingConsumer {
String gatewayId = null;
ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
+ log.info(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getMessageType() + "' for experimentId:" +
" " +
experimentEvent.getExperimentId());
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index e95a7ca..69910bd 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -55,8 +55,7 @@ public class ProcessConsumer extends QueueingConsumer{
}
- @Override
- public void handleDelivery(String consumerTag,
+ @Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
@@ -71,10 +70,9 @@ public class ProcessConsumer extends QueueingConsumer{
if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
- " " +
- processSubmitEvent.getProcessId());
+ log.info(" Message Received with message id '" + message.getMessageId()
+ + " and with message type:" + message.getMessageType() + ", for processId:" +
+ processSubmitEvent.getProcessId() + ", expId:" + processSubmitEvent.getExperimentId());
event = processSubmitEvent;
gatewayId = processSubmitEvent.getGatewayId();
MessageContext messageContext = new MessageContext(event, message.getMessageType(),
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index 3fdb3a1..6f1d1d8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -71,7 +71,7 @@ public class RabbitMQPublisher implements Publisher {
if (properties.getExchangeName() != null) {
channel.exchangeDeclare(properties.getExchangeName(),
properties.getExchangeType(),
- false);
+ true); //durable
}
} catch (Exception e) {
String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName();
@@ -93,6 +93,7 @@ public class RabbitMQPublisher implements Publisher {
message.setMessageType(messageContext.getType());
message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
String routingKey = routingKeySupplier.apply(messageContext);
+ log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey);
byte[] messageBody = ThriftUtils.serializeThriftObject(message);
send(messageBody, routingKey);
} catch (TException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
index 441281d..6b28723 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -85,7 +85,11 @@ public class RabbitMQSubscriber implements Subscriber {
if (queueName == null) {
queueName = channel.queueDeclare().getQueue();
} else {
- channel.queueDeclare(queueName, true, false, false, null);
+ channel.queueDeclare(queueName,
+ true, // durable
+ false, // exclusive
+ false, // autoDelete
+ null);// arguments
}
final String id = getId(routingKeys, queueName);
if (queueDetailMap.containsKey(id)) {
@@ -94,6 +98,7 @@ public class RabbitMQSubscriber implements Subscriber {
}
// bind all the routing keys
for (String key : routingKeys) {
+ log.info("Binding key:" + key + " to queue:" + queueName);
channel.queueBind(queueName, properties.getExchangeName(), key);
}
[2/2] airavata git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/airavata into develop
Posted by sh...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/airavata into develop
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cfe62c3e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cfe62c3e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cfe62c3e
Branch: refs/heads/develop
Commit: cfe62c3e829820d744da4f41e9036c87eccd18dd
Parents: 9f979b5 99d3f4b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Aug 15 15:45:20 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Aug 15 15:45:20 2016 -0400
----------------------------------------------------------------------
.../airavata/registry/core/Committer.java | 29 ++++
.../apache/airavata/registry/core/JPAUtil.java | 57 +++++++
.../catalog/resources/ExperimentResource.java | 148 ++++++-------------
3 files changed, 134 insertions(+), 100 deletions(-)
----------------------------------------------------------------------