You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/12/21 03:57:31 UTC
git commit: adding message receiver classes for health stat messages
Updated Branches:
refs/heads/master c74290abe -> d73fa919c
adding message receiver classes for health stat messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d73fa919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d73fa919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d73fa919
Branch: refs/heads/master
Commit: d73fa919c7525ba772e55d7ae7a46d1fb793a258
Parents: c74290a
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Sat Dec 21 08:32:09 2013 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Sat Dec 21 08:32:09 2013 +0530
----------------------------------------------------------------------
.../stat/HealthStatEventMessageDelegator.java | 95 ++++++++++++++++++++
.../stat/HealthStatEventMessageListener.java | 55 ++++++++++++
.../stat/HealthStatEventMessageQueue.java | 26 ++++++
3 files changed, 176 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
new file mode 100644
index 0000000..9938331
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageQueue;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+
+/**
+ * Implements logic for processing health stat event messages based on a given
+ * topology process chain.
+ */
+public class HealthStatEventMessageDelegator implements Runnable {
+
+ private static final Log log = LogFactory.getLog(HealthStatEventMessageDelegator.class);
+ private MessageProcessorChain processorChain;
+ private boolean terminated;
+
+ public HealthStatEventMessageDelegator() {
+ this.processorChain = new HealthStatMessageProcessorChain();
+ }
+
+ public HealthStatEventMessageDelegator(MessageProcessorChain processorChain) {
+ this.processorChain = processorChain;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Health stat event message delegator started");
+ }
+
+ while (!terminated) {
+ try {
+ TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+
+ // Retrieve the header
+ //TODO get the type from json since this is sent by CEP
+ String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+ // Retrieve the actual message
+ //TODO get the 'message' from full json message
+ String json = message.getText();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Instance notifier event message received from queue: %s", type));
+ }
+
+ // Delegate message to message processor chain
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Delegating instance notifier event message: %s", type));
+ }
+ processorChain.process(type, json, null);
+ } catch (Exception e) {
+ log.error("Failed to retrieve instance notifier event message", e);
+ }
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Instance notifier event message delegator failed", e);
+ }
+ }
+ }
+
+ /**
+ * Terminate topology event message delegator thread.
+ */
+ public void terminate() {
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
new file mode 100644
index 0000000..733521e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventMessageQueue;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * Implements functionality for receiving text based event messages from the health stat
+ * message broker topic and add them to the event queue.
+ */
+public class HealthStatEventMessageListener implements MessageListener {
+
+ private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class);
+
+ @Override
+ public void onMessage(Message message) {
+ if (message instanceof TextMessage) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+ }
+ // Add received message to the queue
+ HealthStatEventMessageQueue.getInstance().add(receivedMessage);
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
new file mode 100644
index 0000000..d68e98e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
@@ -0,0 +1,26 @@
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing instance notifier event messages.
+ */
+public class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
+ private static volatile HealthStatEventMessageQueue instance;
+
+ private HealthStatEventMessageQueue(){
+ }
+
+ public static synchronized HealthStatEventMessageQueue getInstance() {
+ if (instance == null) {
+ synchronized (HealthStatEventMessageQueue.class){
+ if (instance == null) {
+ instance = new HealthStatEventMessageQueue();
+ }
+ }
+ }
+ return instance;
+ }
+}
\ No newline at end of file