You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/12/21 03:57:31 UTC

git commit: adding message receiver classes for health stat messages

Updated Branches:
  refs/heads/master c74290abe -> d73fa919c


adding message receiver classes for health stat messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d73fa919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d73fa919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d73fa919

Branch: refs/heads/master
Commit: d73fa919c7525ba772e55d7ae7a46d1fb793a258
Parents: c74290a
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Sat Dec 21 08:32:09 2013 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Sat Dec 21 08:32:09 2013 +0530

----------------------------------------------------------------------
 .../stat/HealthStatEventMessageDelegator.java   | 95 ++++++++++++++++++++
 .../stat/HealthStatEventMessageListener.java    | 55 ++++++++++++
 .../stat/HealthStatEventMessageQueue.java       | 26 ++++++
 3 files changed, 176 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
new file mode 100644
index 0000000..9938331
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
+import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventMessageQueue;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+
+/**
+ * Implements logic for processing health stat event messages based on a given
+ * topology process chain.
+ */
+public class HealthStatEventMessageDelegator implements Runnable {
+
+    private static final Log log = LogFactory.getLog(HealthStatEventMessageDelegator.class);
+    private MessageProcessorChain processorChain;
+    private boolean terminated;
+
+    public HealthStatEventMessageDelegator() {
+        this.processorChain = new HealthStatMessageProcessorChain();
+    }
+
+    public HealthStatEventMessageDelegator(MessageProcessorChain processorChain) {
+        this.processorChain = processorChain;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Health stat event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+
+                    // Retrieve the header
+                    //TODO get the type from json since this is sent by CEP
+                    String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+                    // Retrieve the actual message
+                    //TODO get the 'message' from full json message
+                    String json = message.getText();
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Instance notifier event message received from queue: %s", type));
+                    }
+
+                    // Delegate message to message processor chain
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Delegating instance notifier event message: %s", type));
+                    }
+                    processorChain.process(type, json, null);
+                } catch (Exception e) {
+                    log.error("Failed to retrieve instance notifier event message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Instance notifier event message delegator failed", e);
+            }
+        }
+    }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
new file mode 100644
index 0000000..733521e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventMessageQueue;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * Implements functionality for receiving text based event messages from the health stat
+ * message broker topic and add them to the event queue.
+ */
+public class HealthStatEventMessageListener implements MessageListener {
+
+    private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class);
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
+                }
+                // Add received message to the queue
+                HealthStatEventMessageQueue.getInstance().add(receivedMessage);
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d73fa919/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
new file mode 100644
index 0000000..d68e98e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
@@ -0,0 +1,26 @@
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing instance notifier event messages.
+ */
+public class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
+    private static volatile HealthStatEventMessageQueue instance;
+
+    private HealthStatEventMessageQueue(){
+    }
+
+    public static synchronized HealthStatEventMessageQueue getInstance() {
+        if (instance == null) {
+            synchronized (HealthStatEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new HealthStatEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}
\ No newline at end of file