You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/31 08:51:39 UTC
[06/10] removing application status and adding applications,
cluster status topic
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
deleted file mode 100644
index 93eeb54..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-public class ApplicationStatusEventMessageListener implements MessageListener {
- private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageListener.class);
-
- private ApplicationStatusEventMessageQueue messageQueue;
-
- public ApplicationStatusEventMessageListener(ApplicationStatusEventMessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
- @Override
- public void onMessage(Message message) {
- if (message instanceof TextMessage) {
- TextMessage receivedMessage = (TextMessage) message;
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
- }
- // Add received message to the queue
- messageQueue.add(receivedMessage);
-
- } catch (JMSException e) {
- log.error(e.getMessage(), e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
deleted file mode 100644
index ba455c9..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-
-import javax.jms.TextMessage;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class ApplicationStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
deleted file mode 100644
index 0b6cada..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.message.receiver.application.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.listener.EventListener;
-import org.apache.stratos.messaging.util.Constants;
-
-public class ApplicationStatusEventReceiver implements Runnable {
- private static final Log log = LogFactory.getLog(ApplicationStatusEventReceiver.class);
-
- private ApplicationStatusEventMessageDelegator messageDelegator;
- private ApplicationStatusEventMessageListener messageListener;
- private TopicSubscriber topicSubscriber;
- private boolean terminated;
-
- public ApplicationStatusEventReceiver() {
- ApplicationStatusEventMessageQueue messageQueue = new ApplicationStatusEventMessageQueue();
- this.messageDelegator = new ApplicationStatusEventMessageDelegator(messageQueue);
- this.messageListener = new ApplicationStatusEventMessageListener(messageQueue);
- }
-
- public void addEventListener(EventListener eventListener) {
- messageDelegator.addEventListener(eventListener);
- }
-
- @Override
- public void run() {
- try {
- // Start topic subscriber thread
- topicSubscriber = new TopicSubscriber(Constants.APPLICATION_STATUS_TOPIC);
- topicSubscriber.setMessageListener(messageListener);
- Thread subscriberThread = new Thread(topicSubscriber);
- subscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Application status event message receiver thread started");
- }
-
- // Start Application status event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Application status event message delegator thread started");
- }
-
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Application status failed", e);
- }
- }
- }
-
- public void terminate() {
- topicSubscriber.terminate();
- messageDelegator.terminate();
- terminated = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
new file mode 100644
index 0000000..68d44b0
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.applications.ApplicationsMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+public class ApplicationsEventMessageDelegator implements Runnable {
+ private static final Log log = LogFactory.getLog(ApplicationsEventMessageDelegator.class);
+
+ private ApplicationsEventMessageQueue messageQueue;
+ private MessageProcessorChain processorChain;
+ private boolean terminated;
+
+ public ApplicationsEventMessageDelegator(ApplicationsEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ this.processorChain = new ApplicationsMessageProcessorChain();
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Application status event message delegator started");
+ }
+
+ while (!terminated) {
+ try {
+ TextMessage message = messageQueue.take();
+
+ // Retrieve the header
+ String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ // Retrieve the actual message
+ String json = message.getText();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Application status event message received from queue: %s", type));
+ }
+
+ // Delegate message to message processor chain
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating application status event message: %s", type));
+ }
+ processorChain.process(type, json, null);
+ } catch (Exception e) {
+ log.error("Failed to retrieve application status event message", e);
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Application status event message delegator failed", e);
+ }
+ }
+ }
+
+ /**
+ * Terminate topology event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
+
+
+ private EventMessage jsonToEventMessage(String json) {
+
+ EventMessage event = new EventMessage();
+ String message;
+
+ //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains
+ //message
+ String[] MessageParts = json.split(":", 3);
+
+ String eventType = MessageParts[0].trim();
+ eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Extracted [event type] %s", eventType));
+ }
+
+ event.setEventName(eventType);
+ String messageTag = MessageParts[1];
+ messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
+
+ if ("message".equals(messageTag)) {
+ message = MessageParts[2].trim();
+ //Remove trailing bracket twice to get the message
+ message = message.substring(0, message.lastIndexOf("}")).trim();
+ message = message.substring(0, message.lastIndexOf("}")).trim();
+ if (message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("[Extracted message] %s ", message));
+ }
+ event.setMessage(message);
+ return event;
+ }
+ }
+ return null;
+ }
+
+ private class EventMessage {
+ private String eventName;
+ private String message;
+
+ private String getEventName() {
+ return eventName;
+ }
+
+ private void setEventName(String eventName) {
+ this.eventName = eventName;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
new file mode 100644
index 0000000..936c174
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+public class ApplicationsEventMessageListener implements MessageListener {
+ private static final Log log = LogFactory.getLog(ApplicationsEventMessageListener.class);
+
+ private ApplicationsEventMessageQueue messageQueue;
+
+ public ApplicationsEventMessageListener(ApplicationsEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ if (message instanceof TextMessage) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+ }
+ // Add received message to the queue
+ messageQueue.add(receivedMessage);
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
new file mode 100644
index 0000000..604513e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ApplicationsEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
new file mode 100644
index 0000000..b7577bd
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.applications;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+public class ApplicationsEventReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
+
+ private ApplicationsEventMessageDelegator messageDelegator;
+ private ApplicationsEventMessageListener messageListener;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public ApplicationsEventReceiver() {
+ ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
+ this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
+ this.messageListener = new ApplicationsEventMessageListener(messageQueue);
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ messageDelegator.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.APPLICATIONS_TOPIC);
+ topicSubscriber.setMessageListener(messageListener);
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Application status event message receiver thread started");
+ }
+
+ // Start Application status event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Application status event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Application status failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
new file mode 100644
index 0000000..a2fed87
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.cluster.status.ClusterStatusMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+public class ClusterStatusEventMessageDelegator implements Runnable {
+ private static final Log log = LogFactory.getLog(ClusterStatusEventMessageDelegator.class);
+
+ private ClusterStatusEventMessageQueue messageQueue;
+ private MessageProcessorChain processorChain;
+ private boolean terminated;
+
+ public ClusterStatusEventMessageDelegator(ClusterStatusEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ this.processorChain = new ClusterStatusMessageProcessorChain();
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ processorChain.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Application status event message delegator started");
+ }
+
+ while (!terminated) {
+ try {
+ TextMessage message = messageQueue.take();
+
+ // Retrieve the header
+ String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ // Retrieve the actual message
+ String json = message.getText();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Application status event message received from queue: %s", type));
+ }
+
+ // Delegate message to message processor chain
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating application status event message: %s", type));
+ }
+ processorChain.process(type, json, null);
+ } catch (Exception e) {
+ log.error("Failed to retrieve application status event message", e);
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Application status event message delegator failed", e);
+ }
+ }
+ }
+
+ /**
+ * Terminate topology event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
+
+
+ private EventMessage jsonToEventMessage(String json) {
+
+ EventMessage event = new EventMessage();
+ String message;
+
+ //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains
+ //message
+ String[] MessageParts = json.split(":", 3);
+
+ String eventType = MessageParts[0].trim();
+ eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Extracted [event type] %s", eventType));
+ }
+
+ event.setEventName(eventType);
+ String messageTag = MessageParts[1];
+ messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\""));
+
+ if ("message".equals(messageTag)) {
+ message = MessageParts[2].trim();
+ //Remove trailing bracket twice to get the message
+ message = message.substring(0, message.lastIndexOf("}")).trim();
+ message = message.substring(0, message.lastIndexOf("}")).trim();
+ if (message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("[Extracted message] %s ", message));
+ }
+ event.setMessage(message);
+ return event;
+ }
+ }
+ return null;
+ }
+
+ private class EventMessage {
+ private String eventName;
+ private String message;
+
+ private String getEventName() {
+ return eventName;
+ }
+
+ private void setEventName(String eventName) {
+ this.eventName = eventName;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
new file mode 100644
index 0000000..12c7800
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+public class ClusterStatusEventMessageListener implements MessageListener {
+ private static final Log log = LogFactory.getLog(ClusterStatusEventMessageListener.class);
+
+ private ClusterStatusEventMessageQueue messageQueue;
+
+ public ClusterStatusEventMessageListener(ClusterStatusEventMessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ if (message instanceof TextMessage) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+ }
+ // Add received message to the queue
+ messageQueue.add(receivedMessage);
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
new file mode 100644
index 0000000..9656800
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ClusterStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
new file mode 100644
index 0000000..72ccaed
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+public class ClusterStatusEventReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class);
+
+ private ClusterStatusEventMessageDelegator messageDelegator;
+ private ClusterStatusEventMessageListener messageListener;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public ClusterStatusEventReceiver() {
+ ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
+ this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
+ this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
+ }
+
+ public void addEventListener(EventListener eventListener) {
+ messageDelegator.addEventListener(eventListener);
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.CLUSTER_STATUS_TOPIC);
+ topicSubscriber.setMessageListener(messageListener);
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Application status event message receiver thread started");
+ }
+
+ // Start Application status event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Application status event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Application status failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 33f2f22..2d2d532 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -24,7 +24,9 @@ public class Constants {
public static final String HEALTH_STAT_TOPIC = "summarized-health-stats";
public static final String INSTANCE_STATUS_TOPIC = "instance-status";
public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
- public static final String APPLICATION_STATUS_TOPIC = "application-status";
+ public static final String APPLICATIONS_TOPIC = "applications";
+ public static final String CLUSTER_STATUS_TOPIC = "applications";
+
public static final String PING_TOPIC = "ping";
public static final String TENANT_TOPIC = "tenant";
public static final String TENANT_RANGE_ALL = "*";