You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:18 UTC
[19/50] [abbrv] airavata git commit: Refactor renamed
RabbitMQListener class and fixed rename issue with RabbitMQStatusConsumer
class
Refactor renamed RabbitMQListener class and fixed rename issue with RabbitMQStatusConsumer class
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/97ff3b7d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/97ff3b7d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/97ff3b7d
Branch: refs/heads/master
Commit: 97ff3b7d12ce1e6ac42e8a3d22c2bfc1de9b30a1
Parents: 249b440
Author: shamrath <sh...@gmail.com>
Authored: Tue Feb 24 11:20:07 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Tue Feb 24 11:20:07 2015 -0500
----------------------------------------------------------------------
modules/messaging/client/pom.xml | 2 +-
.../messaging/client/RabbitMQListener.java | 215 +++++++++++++++++++
.../messaging/client/RabbitMQListner.java | 215 -------------------
3 files changed, 216 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/pom.xml
----------------------------------------------------------------------
diff --git a/modules/messaging/client/pom.xml b/modules/messaging/client/pom.xml
index 762bd10..5db9d4d 100644
--- a/modules/messaging/client/pom.xml
+++ b/modules/messaging/client/pom.xml
@@ -80,7 +80,7 @@
<configuration>
<archive>
<manifest>
- <mainClass>org.apache.airavata.messaging.client.RabbitMQListner</mainClass>
+ <mainClass>org.apache.airavata.messaging.client.RabbitMQListener</mainClass>
</manifest>
</archive>
<descriptorRefs>
http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
new file mode 100644
index 0000000..c1bdc3d
--- /dev/null
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.client;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.commons.cli.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RabbitMQListener {
+ public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+ public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+ private final static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
+ private static String gatewayId = "*";
+ private static boolean gatewayLevelMessages = false;
+ private static boolean experimentLevelMessages = false;
+ private static boolean jobLevelMessages = false;
+ private static String experimentId = "*";
+ private static String jobId = "*";
+ private static boolean allMessages = false;
+
+ public static void main(String[] args) {
+ parseArguments(args);
+ try {
+ AiravataUtils.setExecutionAsServer();
+ String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
+ final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+ RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
+ consumer.listen(new MessageHandler() {
+ @Override
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ List<String> routingKeys = new ArrayList<String>();
+ if (allMessages){
+ routingKeys.add("*");
+ routingKeys.add("*.*");
+ routingKeys.add("*.*.*");
+ routingKeys.add("*.*.*.*");
+ routingKeys.add("*.*.*.*.*");
+ }else {
+ if (gatewayLevelMessages){
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + ".*");
+ routingKeys.add(gatewayId + ".*.*");
+ routingKeys.add(gatewayId + ".*.*.*");
+ routingKeys.add(gatewayId + ".*.*.*.*");
+ }else if (experimentLevelMessages){
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*");
+ }else if (jobLevelMessages){
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId);
+ }
+ }
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ return props;
+ }
+
+ @Override
+ public void onMessage(MessageContext message) {
+ if (message.getType().equals(MessageType.EXPERIMENT)){
+ try {
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }else if (message.getType().equals(MessageType.WORKFLOWNODE)){
+ try {
+ WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getWorkflowNodeIdentity().getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }else if (message.getType().equals(MessageType.TASK)){
+ try {
+ TaskStatusChangeEvent event = new TaskStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getTaskIdentity().getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }else if (message.getType().equals(MessageType.JOB)){
+ try {
+ JobStatusChangeEvent event = new JobStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getJobIdentity().getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ });
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error reading airavata server properties", e);
+ }catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ }
+
+ public static void parseArguments(String[] args) {
+ try{
+ Options options = new Options();
+
+ options.addOption("gId", true , "Gateway ID");
+ options.addOption("eId", true, "Experiment ID");
+ options.addOption("jId", true, "Job ID");
+ options.addOption("a", false, "All Notifications");
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse( options, args);
+ if (cmd.getOptions() == null || cmd.getOptions().length == 0){
+ logger.info("You have not specified any options. We assume you need to listen to all the messages...");
+ allMessages = true;
+ gatewayId = "*";
+ }
+ if (cmd.hasOption("a")){
+ logger.info("Listening to all the messages...");
+ allMessages = true;
+ gatewayId = "*";
+ }else {
+ gatewayId = cmd.getOptionValue("gId");
+ if (gatewayId == null){
+ gatewayId = "*";
+ logger.info("You have not specified a gateway id. We assume you need to listen to all the messages...");
+ } else {
+ gatewayLevelMessages = true;
+ }
+ experimentId = cmd.getOptionValue("eId");
+ if (experimentId == null && !gatewayId.equals("*")){
+ experimentId = "*";
+ logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId);
+ } else if (experimentId == null && gatewayId.equals("*")) {
+ experimentId = "*";
+ logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages...");
+ }else {
+ experimentLevelMessages = true;
+ }
+ jobId = cmd.getOptionValue("jId");
+ if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){
+ jobId = "*";
+ logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId
+ + " with experiment id : " + experimentId );
+ } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) {
+ jobId = "*";
+ logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages...");
+ }else {
+ jobLevelMessages = true;
+ }
+ }
+ } catch (ParseException e) {
+ logger.error("Error while reading command line parameters" , e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/97ff3b7d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
deleted file mode 100644
index 215d531..0000000
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.client;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.commons.cli.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class RabbitMQListner {
- public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
- public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
- private final static Logger logger = LoggerFactory.getLogger(RabbitMQListner.class);
- private static String gatewayId = "*";
- private static boolean gatewayLevelMessages = false;
- private static boolean experimentLevelMessages = false;
- private static boolean jobLevelMessages = false;
- private static String experimentId = "*";
- private static String jobId = "*";
- private static boolean allMessages = false;
-
- public static void main(String[] args) {
- parseArguments(args);
- try {
- AiravataUtils.setExecutionAsServer();
- String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
- final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
- RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
- consumer.listen(new MessageHandler() {
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- List<String> routingKeys = new ArrayList<String>();
- if (allMessages){
- routingKeys.add("*");
- routingKeys.add("*.*");
- routingKeys.add("*.*.*");
- routingKeys.add("*.*.*.*");
- routingKeys.add("*.*.*.*.*");
- }else {
- if (gatewayLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + ".*");
- routingKeys.add(gatewayId + ".*.*");
- routingKeys.add(gatewayId + ".*.*.*");
- routingKeys.add(gatewayId + ".*.*.*.*");
- }else if (experimentLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*");
- }else if (jobLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId);
- }
- }
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
-
- @Override
- public void onMessage(MessageContext message) {
- if (message.getType().equals(MessageType.EXPERIMENT)){
- try {
- ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }else if (message.getType().equals(MessageType.WORKFLOWNODE)){
- try {
- WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getWorkflowNodeIdentity().getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }else if (message.getType().equals(MessageType.TASK)){
- try {
- TaskStatusChangeEvent event = new TaskStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getTaskIdentity().getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }else if (message.getType().equals(MessageType.JOB)){
- try {
- JobStatusChangeEvent event = new JobStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getJobIdentity().getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- });
- } catch (ApplicationSettingsException e) {
- logger.error("Error reading airavata server properties", e);
- }catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
-
- }
-
- public static void parseArguments(String[] args) {
- try{
- Options options = new Options();
-
- options.addOption("gId", true , "Gateway ID");
- options.addOption("eId", true, "Experiment ID");
- options.addOption("jId", true, "Job ID");
- options.addOption("a", false, "All Notifications");
-
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse( options, args);
- if (cmd.getOptions() == null || cmd.getOptions().length == 0){
- logger.info("You have not specified any options. We assume you need to listen to all the messages...");
- allMessages = true;
- gatewayId = "*";
- }
- if (cmd.hasOption("a")){
- logger.info("Listening to all the messages...");
- allMessages = true;
- gatewayId = "*";
- }else {
- gatewayId = cmd.getOptionValue("gId");
- if (gatewayId == null){
- gatewayId = "*";
- logger.info("You have not specified a gateway id. We assume you need to listen to all the messages...");
- } else {
- gatewayLevelMessages = true;
- }
- experimentId = cmd.getOptionValue("eId");
- if (experimentId == null && !gatewayId.equals("*")){
- experimentId = "*";
- logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId);
- } else if (experimentId == null && gatewayId.equals("*")) {
- experimentId = "*";
- logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages...");
- }else {
- experimentLevelMessages = true;
- }
- jobId = cmd.getOptionValue("jId");
- if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){
- jobId = "*";
- logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId
- + " with experiment id : " + experimentId );
- } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) {
- jobId = "*";
- logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages...");
- }else {
- jobLevelMessages = true;
- }
- }
- } catch (ParseException e) {
- logger.error("Error while reading command line parameters" , e);
- }
- }
-}